diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala index 81f27cecac..066e76f9fe 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala @@ -228,8 +228,9 @@ abstract class ClusterShardingRememberEntitiesPerfSpec } awaitAssert({ - region ! GetShardRegionState - val stats = expectMsgType[CurrentShardRegionState] + val probe = TestProbe() + region.tell(GetShardRegionState, probe.ref) + val stats = probe.expectMsgType[CurrentShardRegionState] stats.shards.head.shardId shouldEqual "0" stats.shards.head.entityIds.toList.sorted shouldEqual List("0") // the init entity }, 2.seconds) diff --git a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java index 0b27cd1048..4f597bbb18 100644 --- a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java +++ b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java @@ -5732,6 +5732,10 @@ public final class ClusterShardingMessages { getEntityIdBytes(); } /** + *
+   * not written anymore but kept for backwards compatibility
+   * 
+ * * Protobuf type {@code EntityStarted} */ public static final class EntityStarted extends @@ -6026,6 +6030,10 @@ public final class ClusterShardingMessages { return builder; } /** + *
+     * not written anymore but kept for backwards compatibility
+     * 
+ * * Protobuf type {@code EntityStarted} */ public static final class Builder extends @@ -6979,6 +6987,10 @@ public final class ClusterShardingMessages { getEntityIdBytes(); } /** + *
+   * not written anymore but kept for backwards compatibility
+   * 
+ * * Protobuf type {@code EntityStopped} */ public static final class EntityStopped extends @@ -7273,6 +7285,10 @@ public final class ClusterShardingMessages { return builder; } /** + *
+     * not written anymore but kept for backwards compatibility
+     * 
+ * * Protobuf type {@code EntityStopped} */ public static final class Builder extends @@ -7567,6 +7583,643 @@ public final class ClusterShardingMessages { } + public interface EntitiesStoppedOrBuilder extends + // @@protoc_insertion_point(interface_extends:EntitiesStopped) + akka.protobufv3.internal.MessageOrBuilder { + + /** + * repeated string entityId = 1; + * @return A list containing the entityId. + */ + java.util.List + getEntityIdList(); + /** + * repeated string entityId = 1; + * @return The count of entityId. + */ + int getEntityIdCount(); + /** + * repeated string entityId = 1; + * @param index The index of the element to return. + * @return The entityId at the given index. + */ + java.lang.String getEntityId(int index); + /** + * repeated string entityId = 1; + * @param index The index of the value to return. + * @return The bytes of the entityId at the given index. + */ + akka.protobufv3.internal.ByteString + getEntityIdBytes(int index); + } + /** + * Protobuf type {@code EntitiesStopped} + */ + public static final class EntitiesStopped extends + akka.protobufv3.internal.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:EntitiesStopped) + EntitiesStoppedOrBuilder { + private static final long serialVersionUID = 0L; + // Use EntitiesStopped.newBuilder() to construct. + private EntitiesStopped(akka.protobufv3.internal.GeneratedMessageV3.Builder builder) { + super(builder); + } + private EntitiesStopped() { + entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + } + + @java.lang.Override + @SuppressWarnings({"unused"}) + protected java.lang.Object newInstance( + akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter unused) { + return new EntitiesStopped(); + } + + @java.lang.Override + public final akka.protobufv3.internal.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private EntitiesStopped( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + akka.protobufv3.internal.UnknownFieldSet.Builder unknownFields = + akka.protobufv3.internal.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 10: { + akka.protobufv3.internal.ByteString bs = input.readBytes(); + if (!((mutable_bitField0_ & 0x00000001) != 0)) { + entityId_ = new akka.protobufv3.internal.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000001; + } + entityId_.add(bs); + break; + } + default: { + if (!parseUnknownField( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobufv3.internal.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) != 0)) { + entityId_ = entityId_.getUnmodifiableView(); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.Builder.class); + } + + public static final int ENTITYID_FIELD_NUMBER = 1; + private akka.protobufv3.internal.LazyStringList entityId_; + /** + * repeated string entityId = 1; + * @return A list containing the entityId. + */ + public akka.protobufv3.internal.ProtocolStringList + getEntityIdList() { + return entityId_; + } + /** + * repeated string entityId = 1; + * @return The count of entityId. + */ + public int getEntityIdCount() { + return entityId_.size(); + } + /** + * repeated string entityId = 1; + * @param index The index of the element to return. + * @return The entityId at the given index. + */ + public java.lang.String getEntityId(int index) { + return entityId_.get(index); + } + /** + * repeated string entityId = 1; + * @param index The index of the value to return. + * @return The bytes of the entityId at the given index. + */ + public akka.protobufv3.internal.ByteString + getEntityIdBytes(int index) { + return entityId_.getByteString(index); + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(akka.protobufv3.internal.CodedOutputStream output) + throws java.io.IOException { + for (int i = 0; i < entityId_.size(); i++) { + akka.protobufv3.internal.GeneratedMessageV3.writeString(output, 1, entityId_.getRaw(i)); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + { + int dataSize = 0; + for (int i = 0; i < entityId_.size(); i++) { + dataSize += computeStringSizeNoTag(entityId_.getRaw(i)); + } + size += dataSize; + size += 1 * getEntityIdList().size(); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped)) { + return super.equals(obj); + } + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped other = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped) obj; + + if (!getEntityIdList() + .equals(other.getEntityIdList())) return false; + if (!unknownFields.equals(other.unknownFields)) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (getEntityIdCount() > 0) { + hash = (37 * hash) + ENTITYID_FIELD_NUMBER; + hash = (53 * hash) + getEntityIdList().hashCode(); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom( + java.nio.ByteBuffer data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom( + java.nio.ByteBuffer data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom( + akka.protobufv3.internal.ByteString data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom( + akka.protobufv3.internal.ByteString data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(byte[] data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom( + byte[] data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseDelimitedFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom( + akka.protobufv3.internal.CodedInputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code EntitiesStopped} + */ + public static final class Builder extends + akka.protobufv3.internal.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:EntitiesStopped) + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStoppedOrBuilder { + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.Builder.class); + } + + // Construct using akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobufv3.internal.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + @java.lang.Override + public akka.protobufv3.internal.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_descriptor; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped getDefaultInstanceForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.getDefaultInstance(); + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped build() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped buildPartial() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped result = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped(this); + int from_bitField0_ = bitField0_; + if (((bitField0_ & 0x00000001) != 0)) { + entityId_ = entityId_.getUnmodifiableView(); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.entityId_ = entityId_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + @java.lang.Override + public Builder setField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + akka.protobufv3.internal.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(akka.protobufv3.internal.Message other) { + if (other instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped) { + return mergeFrom((akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped other) { + if (other == akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.getDefaultInstance()) return this; + if (!other.entityId_.isEmpty()) { + if (entityId_.isEmpty()) { + entityId_ = other.entityId_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureEntityIdIsMutable(); + entityId_.addAll(other.entityId_); + } + onChanged(); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + private akka.protobufv3.internal.LazyStringList entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + private void ensureEntityIdIsMutable() { + if (!((bitField0_ & 0x00000001) != 0)) { + entityId_ = new akka.protobufv3.internal.LazyStringArrayList(entityId_); + bitField0_ |= 0x00000001; + } + } + /** + * repeated string entityId = 1; + * @return A list containing the entityId. + */ + public akka.protobufv3.internal.ProtocolStringList + getEntityIdList() { + return entityId_.getUnmodifiableView(); + } + /** + * repeated string entityId = 1; + * @return The count of entityId. + */ + public int getEntityIdCount() { + return entityId_.size(); + } + /** + * repeated string entityId = 1; + * @param index The index of the element to return. + * @return The entityId at the given index. + */ + public java.lang.String getEntityId(int index) { + return entityId_.get(index); + } + /** + * repeated string entityId = 1; + * @param index The index of the value to return. + * @return The bytes of the entityId at the given index. + */ + public akka.protobufv3.internal.ByteString + getEntityIdBytes(int index) { + return entityId_.getByteString(index); + } + /** + * repeated string entityId = 1; + * @param index The index to set the value at. + * @param value The entityId to set. + * @return This builder for chaining. + */ + public Builder setEntityId( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntityIdIsMutable(); + entityId_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string entityId = 1; + * @param value The entityId to add. + * @return This builder for chaining. + */ + public Builder addEntityId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntityIdIsMutable(); + entityId_.add(value); + onChanged(); + return this; + } + /** + * repeated string entityId = 1; + * @param values The entityId to add. + * @return This builder for chaining. + */ + public Builder addAllEntityId( + java.lang.Iterable values) { + ensureEntityIdIsMutable(); + akka.protobufv3.internal.AbstractMessageLite.Builder.addAll( + values, entityId_); + onChanged(); + return this; + } + /** + * repeated string entityId = 1; + * @return This builder for chaining. + */ + public Builder clearEntityId() { + entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + return this; + } + /** + * repeated string entityId = 1; + * @param value The bytes of the entityId to add. + * @return This builder for chaining. + */ + public Builder addEntityIdBytes( + akka.protobufv3.internal.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureEntityIdIsMutable(); + entityId_.add(value); + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:EntitiesStopped) + } + + // @@protoc_insertion_point(class_scope:EntitiesStopped) + private static final akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped(); + } + + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + @java.lang.Deprecated public static final akka.protobufv3.internal.Parser + PARSER = new akka.protobufv3.internal.AbstractParser() { + @java.lang.Override + public EntitiesStopped parsePartialFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return new EntitiesStopped(input, extensionRegistry); + } + }; + + public static akka.protobufv3.internal.Parser parser() { + return PARSER; + } + + @java.lang.Override + public akka.protobufv3.internal.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + public interface ShardStatsOrBuilder extends // @@protoc_insertion_point(interface_extends:ShardStats) akka.protobufv3.internal.MessageOrBuilder { @@ -18914,6 +19567,11 @@ public final class ClusterShardingMessages { private static final akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable internal_static_EntityStopped_fieldAccessorTable; + private static final akka.protobufv3.internal.Descriptors.Descriptor + internal_static_EntitiesStopped_descriptor; + private static final + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internal_static_EntitiesStopped_fieldAccessorTable; private static final akka.protobufv3.internal.Descriptors.Descriptor internal_static_ShardStats_descriptor; private static final @@ -19005,29 +19663,30 @@ public final class ClusterShardingMessages { "\t\022\016\n\006region\030\002 \002(\t\"\037\n\013EntityState\022\020\n\010enti" + "ties\030\001 \003(\t\"!\n\rEntityStarted\022\020\n\010entityId\030" + "\001 \002(\t\"#\n\017EntitiesStarted\022\020\n\010entityId\030\001 \003" + - "(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t\"0\n" + - "\nShardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityCoun" + - "t\030\002 \002(\005\"A\n\020ShardRegionStats\022\035\n\005stats\030\001 \003" + - "(\0132\016.MapFieldEntry\022\016\n\006failed\030\002 \003(\t\"+\n\rMa" + - "pFieldEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\005\"" + - "/\n\027GetClusterShardingStats\022\024\n\014timeoutNan" + - "os\030\001 \002(\003\"A\n\024ClusterShardingStats\022)\n\005stat" + - "s\030\001 \003(\0132\032.ClusterShardingStatsEntry\"X\n\031C" + - "lusterShardingStatsEntry\022\031\n\007address\030\001 \002(" + - "\0132\010.Address\022 \n\005stats\030\002 \002(\0132\021.ShardRegion" + - "Stats\"+\n\016CurrentRegions\022\031\n\007regions\030\001 \003(\013" + - "2\010.Address\"K\n\007Address\022\020\n\010protocol\030\001 \002(\t\022" + - "\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004por" + - "t\030\004 \002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001 \002(\t" + - "\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022\017\n\007" + - "shardId\030\002 \002(\t\"7\n\021CurrentShardState\022\017\n\007sh" + - "ardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"0\n\nShardS" + - "tate\022\017\n\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t" + - "\"F\n\027CurrentShardRegionState\022\033\n\006shards\030\001 " + - "\003(\0132\013.ShardState\022\016\n\006failed\030\002 \003(\t\"7\n\024Reme" + - "mberedShardState\022\017\n\007shardId\030\001 \003(\t\022\016\n\006mar" + - "ker\030\002 \001(\010B&\n\"akka.cluster.sharding.proto" + - "buf.msgH\001" + "(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t\"#\n" + + "\017EntitiesStopped\022\020\n\010entityId\030\001 \003(\t\"0\n\nSh" + + "ardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityCount\030\002" + + " \002(\005\"A\n\020ShardRegionStats\022\035\n\005stats\030\001 \003(\0132" + + "\016.MapFieldEntry\022\016\n\006failed\030\002 \003(\t\"+\n\rMapFi" + + "eldEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\005\"/\n\027" + + "GetClusterShardingStats\022\024\n\014timeoutNanos\030" + + "\001 \002(\003\"A\n\024ClusterShardingStats\022)\n\005stats\030\001" + + " \003(\0132\032.ClusterShardingStatsEntry\"X\n\031Clus" + + "terShardingStatsEntry\022\031\n\007address\030\001 \002(\0132\010" + + ".Address\022 \n\005stats\030\002 \002(\0132\021.ShardRegionSta" + + "ts\"+\n\016CurrentRegions\022\031\n\007regions\030\001 \003(\0132\010." + + "Address\"K\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006" + + "system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004port\030\004" + + " \002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001 \002(\t\"3\n" + + "\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022\017\n\007sha" + + "rdId\030\002 \002(\t\"7\n\021CurrentShardState\022\017\n\007shard" + + "Id\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"0\n\nShardStat" + + "e\022\017\n\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"F\n" + + "\027CurrentShardRegionState\022\033\n\006shards\030\001 \003(\013" + + "2\013.ShardState\022\016\n\006failed\030\002 \003(\t\"7\n\024Remembe" + + "redShardState\022\017\n\007shardId\030\001 \003(\t\022\016\n\006marker" + + "\030\002 \001(\010B&\n\"akka.cluster.sharding.protobuf" + + ".msgH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -19093,86 +19752,92 @@ public final class ClusterShardingMessages { akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_EntityStopped_descriptor, new java.lang.String[] { "EntityId", }); - internal_static_ShardStats_descriptor = + internal_static_EntitiesStopped_descriptor = getDescriptor().getMessageTypes().get(9); + internal_static_EntitiesStopped_fieldAccessorTable = new + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( + internal_static_EntitiesStopped_descriptor, + new java.lang.String[] { "EntityId", }); + internal_static_ShardStats_descriptor = + getDescriptor().getMessageTypes().get(10); internal_static_ShardStats_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ShardStats_descriptor, new java.lang.String[] { "Shard", "EntityCount", }); internal_static_ShardRegionStats_descriptor = - getDescriptor().getMessageTypes().get(10); + getDescriptor().getMessageTypes().get(11); internal_static_ShardRegionStats_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ShardRegionStats_descriptor, new java.lang.String[] { "Stats", "Failed", }); internal_static_MapFieldEntry_descriptor = - getDescriptor().getMessageTypes().get(11); + getDescriptor().getMessageTypes().get(12); internal_static_MapFieldEntry_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_MapFieldEntry_descriptor, new java.lang.String[] { "Key", "Value", }); internal_static_GetClusterShardingStats_descriptor = - getDescriptor().getMessageTypes().get(12); + getDescriptor().getMessageTypes().get(13); internal_static_GetClusterShardingStats_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_GetClusterShardingStats_descriptor, new java.lang.String[] { "TimeoutNanos", }); internal_static_ClusterShardingStats_descriptor = - getDescriptor().getMessageTypes().get(13); + getDescriptor().getMessageTypes().get(14); internal_static_ClusterShardingStats_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ClusterShardingStats_descriptor, new java.lang.String[] { "Stats", }); internal_static_ClusterShardingStatsEntry_descriptor = - getDescriptor().getMessageTypes().get(14); + getDescriptor().getMessageTypes().get(15); internal_static_ClusterShardingStatsEntry_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ClusterShardingStatsEntry_descriptor, new java.lang.String[] { "Address", "Stats", }); internal_static_CurrentRegions_descriptor = - getDescriptor().getMessageTypes().get(15); + getDescriptor().getMessageTypes().get(16); internal_static_CurrentRegions_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_CurrentRegions_descriptor, new java.lang.String[] { "Regions", }); internal_static_Address_descriptor = - getDescriptor().getMessageTypes().get(16); + getDescriptor().getMessageTypes().get(17); internal_static_Address_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_Address_descriptor, new java.lang.String[] { "Protocol", "System", "Hostname", "Port", }); internal_static_StartEntity_descriptor = - getDescriptor().getMessageTypes().get(17); + getDescriptor().getMessageTypes().get(18); internal_static_StartEntity_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_StartEntity_descriptor, new java.lang.String[] { "EntityId", }); internal_static_StartEntityAck_descriptor = - getDescriptor().getMessageTypes().get(18); + getDescriptor().getMessageTypes().get(19); internal_static_StartEntityAck_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_StartEntityAck_descriptor, new java.lang.String[] { "EntityId", "ShardId", }); internal_static_CurrentShardState_descriptor = - getDescriptor().getMessageTypes().get(19); + getDescriptor().getMessageTypes().get(20); internal_static_CurrentShardState_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_CurrentShardState_descriptor, new java.lang.String[] { "ShardId", "EntityIds", }); internal_static_ShardState_descriptor = - getDescriptor().getMessageTypes().get(20); + getDescriptor().getMessageTypes().get(21); internal_static_ShardState_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_ShardState_descriptor, new java.lang.String[] { "ShardId", "EntityIds", }); internal_static_CurrentShardRegionState_descriptor = - getDescriptor().getMessageTypes().get(21); + getDescriptor().getMessageTypes().get(22); internal_static_CurrentShardRegionState_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_CurrentShardRegionState_descriptor, new java.lang.String[] { "Shards", "Failed", }); internal_static_RememberedShardState_descriptor = - getDescriptor().getMessageTypes().get(22); + getDescriptor().getMessageTypes().get(23); internal_static_RememberedShardState_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_RememberedShardState_descriptor, diff --git a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto index f22582a815..bc5089644f 100644 --- a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto +++ b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto @@ -42,6 +42,7 @@ message EntityState { repeated string entities = 1; } +// not written anymore but kept for backwards compatibility message EntityStarted { required string entityId = 1; } @@ -50,10 +51,15 @@ message EntitiesStarted { repeated string entityId = 1; } +// not written anymore but kept for backwards compatibility message EntityStopped { required string entityId = 1; } +message EntitiesStopped { + repeated string entityId = 1; +} + message ShardStats { required string shard = 1; required int32 entityCount = 2; diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index c2505c045f..d0c1c1d6c5 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -26,6 +26,7 @@ import akka.cluster.sharding.internal.RememberEntitiesProvider import akka.cluster.sharding.internal.RememberEntityStarter import akka.coordination.lease.scaladsl.Lease import akka.coordination.lease.scaladsl.LeaseProvider +import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter import akka.pattern.pipe import akka.util.MessageBufferMap @@ -53,13 +54,13 @@ private[akka] object Shard { * When remembering entities and the entity stops without issuing a `Passivate`, we * restart it after a back off using this message. */ - final case class RestartEntity(entity: EntityId) extends RememberEntityCommand + final case class RestartTerminatedEntity(entity: EntityId) extends RememberEntityCommand /** * When initialising a shard with remember entities enabled the following message is used * to restart batches of entity actors at a time. */ - final case class RestartEntities(entity: Set[EntityId]) extends RememberEntityCommand + final case class RestartRememberedEntities(entity: Set[EntityId]) extends RememberEntityCommand /** * A query for information about the shard @@ -105,92 +106,205 @@ private[akka] object Shard { private final case class RememberedEntityIds(ids: Set[EntityId]) private final case class RememberEntityStoreCrashed(store: ActorRef) - private final case object AsyncWriteDone private val RememberEntityTimeoutKey = "RememberEntityTimeout" - case class RememberEntityTimeout(operation: RememberEntitiesShardStore.Command) + final case class RememberEntityTimeout(operation: RememberEntitiesShardStore.Command) // FIXME Leaving this on while we are working on the remember entities refactor // should it go in settings perhaps, useful for tricky sharding bugs? final val VerboseDebug = true - final case class StartEntityInternal(id: EntityId) + /** + * State machine for an entity: + * {{{ + * Entity id remembered on shard start +-------------------------+ restart (via region) + * +--------------------------------->| RememberedButNotCreated |------------------------------+ + * | +-------------------------+ | + * | | | + * | | early message for entity | + * | v | + * | Remember entities entity start +-------------------+ start stored and entity started | + * | +-----------------------> | RememberingStart |-------------+ v + * No state for id | | +-------------------+ | +------------+ + * +---+ | | +-------------> | Active | + * | |--------|--------|------------------------------------------------------------ +------------+ + * +---+ | Non remember entities entity start | + * ^ | | + * | | | + * | | restart after backoff entity terminated | + * | | or message for entity +-------------------+ without passivation | passivation initiated +-------------+ + * | +<------------------------------| WaitingForRestart |<-----------------+-----------+----------------------------> | Passivating | + * | | +-------------------+ | +-------------+ + * | | | | + * | | | entity terminated +--------------+ + * | | | v | + * | | There were buffered messages for entity | +-------------------+ | + * | +<---------------------------------------------------------------------+ | RememberingStop | | + * | +-------------------+ | + * | | | + * | | | + * +-------------------------------------------------------------------------------------------------------------------------------------------+<-------------- + * stop stored/passivation complete + * }}} + **/ + sealed trait EntityState { + def transition(newState: EntityState): EntityState + } - sealed trait EntityState + /** + * Empty state rather than using optionals, + * is never really kept track of but used to verify state transitions + * and as return value instead of null + */ + case object NoState extends EntityState { + override def transition(newState: EntityState): EntityState = newState match { + case RememberedButNotCreated => RememberedButNotCreated + case remembering: RememberingStart => remembering + case active: Active => active + case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + } + } /** * In this state we know the entity has been stored in the remember sore but * it hasn't been created yet. E.g. on restart when first getting all the * remembered entity ids */ - case object RememberedButNotCreated extends EntityState + // FIXME: since remember entities on start has a hop via region this could be a (small) resource leak + // if the shard extractor has changed, the entities will stay in the map forever as RememberedButNotCreated + // do we really need to track it? + case object RememberedButNotCreated extends EntityState { + override def transition(newState: EntityState): EntityState = newState match { + case NoState => RememberedButNotCreated + case active: Active => active + case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + } + } /** * When remember entities is enabled an entity is in this state while - * its existence is being recorded in the remember entities store. + * its existence is being recorded in the remember entities store, or while the stop is queued up + * to be stored in the next batch. */ - case object Remembering extends EntityState + final case class RememberingStart(ackTo: Option[ActorRef]) extends EntityState { + override def transition(newState: EntityState): EntityState = newState match { + case active: Active => active + case r: RememberingStart => + ackTo match { + case None => r + case Some(_) => + r // FIXME is this really fine, replace ackTo with another one or None + } + case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + } + } + private val RememberingStartNoAck = RememberingStart(None) + + sealed trait StopReason + final case object PassivationComplete extends StopReason + final case object StartedElsewhere extends StopReason + + /** + * When remember entities is enabled an entity is in this state while + * its stop is being recorded in the remember entities store, or while the stop is queued up + * to be stored in the next batch. + */ + final case class RememberingStop(stopReason: StopReason) extends EntityState { + override def transition(newState: EntityState): EntityState = newState match { + case NoState => NoState + case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + } + } + sealed trait WithRef extends EntityState { def ref: ActorRef } - final case class Active(ref: ActorRef) extends WithRef - final case class Passivating(ref: ActorRef) extends WithRef - case object WaitingForRestart extends EntityState + final case class Active(ref: ActorRef) extends WithRef { + override def transition(newState: EntityState): EntityState = newState match { + case passivating: Passivating => passivating + case WaitingForRestart => WaitingForRestart + case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + } + } + final case class Passivating(ref: ActorRef) extends WithRef { + override def transition(newState: EntityState): EntityState = newState match { + case r: RememberingStop => r + case NoState => NoState + case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + } + } - /** - * Entity has been stopped. If the entity was pasivating it'll be removed from state - * once the stop has been recorded. - */ - case object Stopped extends EntityState - - final case class EntityTerminated(id: EntityId, ref: ActorRef) - - final class Entities(log: LoggingAdapter) { + case object WaitingForRestart extends EntityState { + override def transition(newState: EntityState): EntityState = newState match { + case remembering: RememberingStart => remembering + case active: Active => active + case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + } + } + final class Entities(log: LoggingAdapter, rememberingEntities: Boolean) { private val entities: java.util.Map[EntityId, EntityState] = new util.HashMap[EntityId, EntityState]() // needed to look up entity by reg when a Passivating is received private val byRef = new util.HashMap[ActorRef, EntityId]() + private val remembering = new util.HashSet[EntityId]() def alreadyRemembered(set: Set[EntityId]): Unit = { - set.foreach { id => - entities.put(id, RememberedButNotCreated) + set.foreach { entityId => + val state = entityState(entityId).transition(RememberedButNotCreated) + entities.put(entityId, state) } } - def remembering(entity: EntityId): Unit = { - entities.put(entity, Remembering) + def rememberingStart(entityId: EntityId, ackTo: Option[ActorRef]): Unit = { + // FIXME we queue these without bounds, is that ok? + val newState: RememberingStart = ackTo match { + case None => RememberingStartNoAck // small optimization avoiding alloc for regular start + case some => RememberingStart(some) + } + val state = entityState(entityId).transition(newState) + entities.put(entityId, state) + if (rememberingEntities) + remembering.add(entityId) } - def terminated(ref: ActorRef): Unit = { - val id = byRef.get(ref) - entities.put(id, Stopped) - byRef.remove(ref) + def rememberingStop(entityId: EntityId, reason: StopReason): Unit = { + val state = entityState(entityId) + removeRefIfThereIsOne(state) + entities.put(entityId, state.transition(RememberingStop(reason))) + if (rememberingEntities) + remembering.add(entityId) } def waitingForRestart(id: EntityId): Unit = { - entities.get(id) match { + val state = entities.get(id) match { case wr: WithRef => byRef.remove(wr.ref) - case _ => + wr + case null => NoState + case other => other } - entities.put(id, WaitingForRestart) + entities.put(id, state.transition(WaitingForRestart)) } def removeEntity(entityId: EntityId): Unit = { - entities.get(entityId) match { - case wr: WithRef => - byRef.remove(wr.ref) - case _ => - } + val state = entityState(entityId) + // just verify transition + state.transition(NoState) + removeRefIfThereIsOne(state) entities.remove(entityId) + if (rememberingEntities) + remembering.remove(entityId) } def addEntity(entityId: EntityId, ref: ActorRef): Unit = { - entities.put(entityId, Active(ref)) + val state = entityState(entityId).transition(Active(ref)) + entities.put(entityId, state) byRef.put(ref, entityId) + if (rememberingEntities) + remembering.remove(entityId) } def entity(entityId: EntityId): OptionVal[ActorRef] = entities.get(entityId) match { case wr: WithRef => OptionVal.Some(wr.ref) case _ => OptionVal.None } - def entityState(id: EntityId): OptionVal[EntityState] = { - OptionVal(entities.get(id)) + def entityState(id: EntityId): EntityState = { + OptionVal(entities.get(id)).getOrElse(NoState) } def entityId(ref: ActorRef): OptionVal[EntityId] = OptionVal(byRef.get(ref)) @@ -202,15 +316,23 @@ private[akka] object Shard { } } def entityPassivating(entityId: EntityId): Unit = { - if (VerboseDebug) log.debug("{} passivating, all entities: {}", entityId, entities) + if (VerboseDebug) log.debug("[{}] passivating", entityId) entities.get(entityId) match { case wf: WithRef => - entities.put(entityId, Passivating(wf.ref)) + val state = entityState(entityId).transition(Passivating(wf.ref)) + entities.put(entityId, state) case other => throw new IllegalStateException( s"Tried to passivate entity without an actor ref $entityId. Current state $other") } } + private def removeRefIfThereIsOne(state: EntityState): Unit = { + state match { + case wr: WithRef => + byRef.remove(wr.ref) + case _ => + } + } import akka.util.ccompat.JavaConverters._ // only called once during handoff @@ -219,8 +341,31 @@ private[akka] object Shard { // only called for getting shard stats def activeEntityIds(): Set[EntityId] = byRef.values.asScala.toSet + /** + * @return (remembering start, remembering stop) + */ + def pendingRememberEntities(): (Map[EntityId, RememberingStart], Set[EntityId]) = { + if (remembering.isEmpty) { + (Map.empty, Set.empty) + } else { + val starts = Map.newBuilder[EntityId, RememberingStart] + val stops = Set.newBuilder[EntityId] + remembering.forEach(entityId => + entityState(entityId) match { + case r: RememberingStart => starts += (entityId -> r) + case _: RememberingStop => stops += entityId + case wat => throw new IllegalStateException(s"$entityId was in the remembering set but has state $wat") + }) + (starts.result(), stops.result()) + } + } + + def pendingRememberedEntitiesExist(): Boolean = !remembering.isEmpty + def entityIdExists(id: EntityId): Boolean = entities.get(id) != null def size: Int = entities.size + + override def toString: EntityId = entities.toString } } @@ -255,10 +400,8 @@ private[akka] class Shard( import ShardRegion.Passivate import ShardRegion.ShardInitialized import ShardRegion.handOffStopperProps - import settings.tuningParameters._ import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage - import akka.cluster.sharding.ShardRegion.ShardRegionCommand private val rememberEntitiesStore: Option[ActorRef] = rememberEntitiesProvider.map { provider => @@ -267,21 +410,9 @@ private[akka] class Shard( store } - private val rememberedEntitiesRecoveryStrategy: EntityRecoveryStrategy = { - import settings.tuningParameters._ - entityRecoveryStrategy match { - case "all" => EntityRecoveryStrategy.allStrategy() - case "constant" => - EntityRecoveryStrategy.constantStrategy( - context.system, - entityRecoveryConstantRateStrategyFrequency, - entityRecoveryConstantRateStrategyNumberOfEntities) - } - } - private val flightRecorder = ShardingFlightRecorder(context.system) - private val entities = new Entities(log) + private val entities = new Entities(log, settings.rememberEntities) private var lastMessageTimestamp = Map.empty[EntityId, Long] @@ -418,9 +549,12 @@ private[akka] class Shard( "Shard starting [{}] remembered entities using strategy [{}]", ids.size, rememberedEntitiesRecoveryStrategy) + // FIXME Separation of concerns: shouldn't this future juggling be part of the RememberEntityStarter actor instead? rememberedEntitiesRecoveryStrategy.recoverEntities(ids).foreach { scheduledRecovery => - import context.dispatcher - scheduledRecovery.filter(_.nonEmpty).map(RestartEntities).pipeTo(self) + scheduledRecovery + .filter(_.nonEmpty)(ExecutionContexts.parasitic) + .map(RestartRememberedEntities)(ExecutionContexts.parasitic) + .pipeTo(self) } } @@ -431,121 +565,185 @@ private[akka] class Shard( // ===== shard up and running ===== + // when not remembering entities, we stay in this state all the time def idle: Receive = { case Terminated(ref) => receiveTerminated(ref) - case EntityTerminated(id, ref) => entityTerminated(ref, id) case msg: CoordinatorMessage => receiveCoordinatorMessage(msg) case msg: RememberEntityCommand => receiveRememberEntityCommand(msg) - case msg: ShardRegion.StartEntity => startEntities(Map(msg.entityId -> Some(sender()))) + case msg: ShardRegion.StartEntity => startEntity(msg.entityId, Some(sender())) case msg: ShardRegion.StartEntityAck => receiveStartEntityAck(msg) - case msg: StartEntityInternal => startEntities(Map(msg.id -> None)) - case msg: ShardRegionCommand => receiveShardRegionCommand(msg) + case Passivate(stopMessage) => passivate(sender(), stopMessage) case msg: ShardQuery => receiveShardQuery(msg) case PassivateIdleTick => passivateIdleEntities() case msg: LeaseLost => receiveLeaseLost(msg) case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) - case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender(), OptionVal.None) + case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender()) } - def rememberRemove(entityId: String)(whenDone: () => Unit): Unit = { + def rememberUpdate(add: Set[EntityId] = Set.empty, remove: Set[EntityId] = Set.empty): Unit = { rememberEntitiesStore match { case None => - whenDone() + onUpdateDone(add, remove) case Some(store) => - flightRecorder.rememberEntityRemove(entityId) - sendToRememberStore(store, Set(entityId), RememberEntitiesShardStore.RemoveEntity(entityId))(whenDone) - } - } - def rememberAdd(entityIds: Set[EntityId])(whenDone: () => Unit): Unit = { - rememberEntitiesStore match { - case None => - whenDone() - case Some(store) => - entityIds.foreach { id => - entities.remembering(id) - flightRecorder.rememberEntityAdd(id) - } - sendToRememberStore(store, entityIds, RememberEntitiesShardStore.AddEntities(entityIds))(whenDone) + sendToRememberStore(store, storingStarts = add, storingStops = remove) } } - /** - * The whenDone callback should not call become either directly or by calling this method again - * as it uses become. - */ - def sendToRememberStore(store: ActorRef, entityIds: Set[EntityId], command: RememberEntitiesShardStore.Command)( - whenDone: () => Unit): Unit = { - if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds.mkString(", "), command) - val startTime = System.nanoTime() - store ! command + def sendToRememberStore(store: ActorRef, storingStarts: Set[EntityId], storingStops: Set[EntityId]): Unit = { + if (VerboseDebug) + log.debug( + "Remember update [{}] and stops [{}] triggered", + storingStarts.mkString(", "), + storingStops.mkString(", ")) + + storingStarts.foreach { entityId => + flightRecorder.rememberEntityAdd(entityId) + } + storingStops.foreach { id => + flightRecorder.rememberEntityRemove(id) + } + val startTimeNanos = System.nanoTime() + val update = RememberEntitiesShardStore.Update(started = storingStarts, stopped = storingStops) + store ! update timers.startSingleTimer( RememberEntityTimeoutKey, - RememberEntityTimeout(command), + RememberEntityTimeout(update), // FIXME this timeout needs to match the timeout used in the ddata shard write since that tries 3 times // and this could always fail before ddata store completes retrying writes settings.tuningParameters.updatingStateTimeout) - context.become(waitingForUpdate(Map.empty)) + context.become(waitingForRememberEntitiesStore(update, startTimeNanos)) + } - def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = { - // none of the current impls will send back a partial update, yet! - case RememberEntitiesShardStore.UpdateDone(ids) => - val duration = System.nanoTime() - startTime - if (VerboseDebug) log.debug("Update done for ids [{}]", ids.mkString(", ")) - flightRecorder.rememberEntityOperation(duration) - timers.cancel(RememberEntityTimeoutKey) - whenDone() - if (pendingStarts.isEmpty) { - if (VerboseDebug) log.debug("No pending entities, going to idle") - unstashAll() - context.become(idle) - } else { - if (VerboseDebug) - log.debug("New entities encountered while waiting starting those: [{}]", pendingStarts.keys.mkString(", ")) - startEntities(pendingStarts) - } - case RememberEntityTimeout(`command`) => - throw new RuntimeException( - s"Async write for entityIds $entityIds timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") - case msg: ShardRegion.StartEntity => + private def waitingForRememberEntitiesStore( + update: RememberEntitiesShardStore.Update, + startTimeNanos: Long): Receive = { + // none of the current impls will send back a partial update, yet! + case RememberEntitiesShardStore.UpdateDone(storedStarts, storedStops) => + val duration = System.nanoTime() - startTimeNanos + if (VerboseDebug) + log.debug( + "Update done for ids, started [{}], stopped [{}]. Duration {} ms", + storedStarts.mkString(", "), + storedStops.mkString(", "), + duration.nanos.toMillis) + flightRecorder.rememberEntityOperation(duration) + timers.cancel(RememberEntityTimeoutKey) + onUpdateDone(storedStarts, storedStops) + + case RememberEntityTimeout(`update`) => + log.error("Remember entity store did not respond, crashing shard") + throw new RuntimeException( + s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") + case ShardRegion.StartEntity(entityId) => + if (!entities.entityIdExists(entityId)) { if (VerboseDebug) log.debug( - "Start entity while a write already in progress. Pending writes {}. Writes in progress {}", - pendingStarts, - entityIds) - if (!entities.entityIdExists(msg.entityId)) - context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender())))) + "StartEntity([{}]) from [{}] while a write already in progress. Marking as pending", + entityId, + sender()) + entities.rememberingStart(entityId, ackTo = Some(sender())) + } else { + // it's already running, ack immediately + sender() ! ShardRegion.StartEntityAck(entityId, shardId) + } - // below cases should handle same messages as in idle - case _: Terminated => stash() - case _: EntityTerminated => stash() - case _: CoordinatorMessage => stash() - case _: RememberEntityCommand => stash() - case _: ShardRegion.StartEntityAck => stash() - case _: StartEntityInternal => stash() - case _: ShardRegionCommand => stash() - case msg: ShardQuery => receiveShardQuery(msg) - case PassivateIdleTick => stash() - case msg: LeaseLost => receiveLeaseLost(msg) - case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) - case msg if extractEntityId.isDefinedAt(msg) => - // FIXME now the delivery logic is again spread out across two places, is this needed over what is in deliverMessage? - val (id, _) = extractEntityId(msg) - if (entities.entityIdExists(id)) { - if (VerboseDebug) log.debug("Entity already known about. Try and deliver. [{}]", id) - deliverMessage(msg, sender(), OptionVal.Some(entityIds)) - } else { - if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. [{}]", id) - appendToMessageBuffer(id, msg, sender()) - context.become(waitingForUpdate(pendingStarts + (id -> None))) + case Terminated(ref) => receiveTerminated(ref) + case _: CoordinatorMessage => stash() + case RestartTerminatedEntity(entityId) => + entities.entityState(entityId) match { + case WaitingForRestart => + if (VerboseDebug) + log.debug("Restarting terminated entity [{}]", entityId) + getOrCreateEntity(entityId) + case other => + throw new IllegalStateException( + s"Got RestartTerminatedEntity for [$entityId] but it's not waiting to be restarted. Actual state [$other]") + } + case RestartRememberedEntities(entities) => restartEntities(entities) + case l: LeaseLost => receiveLeaseLost(l) + case ShardRegion.StartEntityAck(entityId, _) => + if (update.started.contains(entityId)) { + // currently in progress of starting, so we'll need to stop it when that is done + entities.rememberingStop(entityId, StartedElsewhere) + } else { + entities.entityState(entityId) match { + case _: RememberingStart => + // queued up for batched start, let's just not start it + entities.removeEntity(entityId) + case _: Active => + // add to stop batch + entities.rememberingStop(entityId, StartedElsewhere) + case _ => + // not sure for other states, so deal with it later + stash() } - case msg => - // shouldn't be any other message types, but just in case - log.warning( - "Stashing unexpected message [{}] while waiting for remember entities update of [{}]", - msg.getClass, - entityIds.mkString(", ")) - stash() + } + case ShardRegion.Passivate(stopMessage) => + if (VerboseDebug) + log.debug( + "Passivation of [{}] arrived while updating", + entities.entityId(sender()).getOrElse(s"Unknown actor ${sender()}")) + passivate(sender(), stopMessage) + case msg: ShardQuery => receiveShardQuery(msg) + case PassivateIdleTick => stash() + case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) + case msg if extractEntityId.isDefinedAt(msg) => + deliverMessage(msg, sender()) + case msg => + // shouldn't be any other message types, but just in case + log.warning( + "Stashing unexpected message [{}] while waiting for remember entities update of starts [{}], stops [{}]", + msg.getClass, + update.started.mkString(", "), + update.stopped.mkString(", ")) + stash() + + } + + def onUpdateDone(starts: Set[EntityId], stops: Set[EntityId]): Unit = { + // entities can contain both ids from start/stops and pending ones, so we need + // to mark the completed ones as complete to get the set of pending ones + starts.foreach { entityId => + val stateBeforeStart = entities.entityState(entityId) + // this will start the entity and transition the entity state in sessions to active + getOrCreateEntity(entityId) + sendMsgBuffer(entityId) + stateBeforeStart match { + case RememberingStart(Some(ackTo)) => ackTo ! ShardRegion.StartEntityAck(entityId, shardId) + case _ => + } + touchLastMessageTimestamp(entityId) + } + stops.foreach { entityId => + entities.entityState(entityId) match { + case RememberingStop(PassivationComplete) => + // this updates entity state + passivateCompleted(entityId) + case RememberingStop(StartedElsewhere) => + // Drop buffered messages if any (to not cause re-ordering) + messageBuffers.remove(entityId) + entities.removeEntity(entityId) + case state => + throw new IllegalStateException( + s"Unexpected state [$state] when storing stop completed for entity id [$entityId]") + } + } + + val (pendingStarts, pendingStops) = entities.pendingRememberEntities() + if (pendingStarts.isEmpty && pendingStops.isEmpty) { + if (VerboseDebug) log.debug("Update complete, no pending updates, going to idle") + unstashAll() + context.become(idle) + } else { + // Note: no unstashing as long as we are batching, is that a problem? + val pendingStartIds = pendingStarts.keySet + if (VerboseDebug) + log.debug( + "Update complete, pending updates, doing another write. Starts [{}], stops [{}]", + pendingStartIds.mkString(", "), + pendingStops.mkString(", ")) + rememberUpdate(pendingStartIds, pendingStops) } } @@ -565,41 +763,46 @@ private[akka] class Shard( } private def receiveRememberEntityCommand(msg: RememberEntityCommand): Unit = msg match { - // these are only used with remembering entities upon start - case RestartEntity(id) => - // starting because it was remembered as started on shard startup (note that a message starting - // it up could already have arrived and in that case it will already be started) - getOrCreateEntity(id) - case RestartEntities(ids) => restartEntities(ids) + case RestartTerminatedEntity(entityId) => + entities.entityState(entityId) match { + case WaitingForRestart => + if (VerboseDebug) log.debug("Restarting entity unexpectedly terminated entity [{}]", entityId) + getOrCreateEntity(entityId) + case Active(_) => + // it up could already have been started, that's fine + if (VerboseDebug) log.debug("Got RestartTerminatedEntity for [{}] but it is already running") + case other => + throw new IllegalStateException( + s"Unexpected state for [$entityId] when getting RestartTerminatedEntity: [$other]") + } + + case RestartRememberedEntities(ids) => restartEntities(ids) } // this could be because of a start message or due to a new message for the entity // if it is a start entity then start entity ack is sent after it is created - private def startEntities(entities: Map[EntityId, Option[ActorRef]]): Unit = { - val alreadyStarted = entities.filterKeys(entity => this.entities.entityIdExists(entity)) - val needStarting = entities -- alreadyStarted.keySet - if (log.isDebugEnabled) { - log.debug( - "Request to start entities. Already started [{}]. Need starting [{}]", - alreadyStarted.keys.mkString(", "), - needStarting.keys.mkString(", ")) - } - alreadyStarted.foreach { - case (entityId, requestor) => + private def startEntity(entityId: EntityId, ackTo: Option[ActorRef]): Unit = { + entities.entityState(entityId) match { + case Active(_) => + log.debug("Request to start entity [{}] (Already started)", entityId) + touchLastMessageTimestamp(entityId) + ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) + case _: RememberingStart => + // FIXME safe to replace ackTo? + entities.rememberingStart(entityId, ackTo) + case RememberedButNotCreated => + // already remembered, just start it - this will be the normal path for initially remembered entities + log.debug("Request to start (already remembered) entity [{}]", entityId) getOrCreateEntity(entityId) touchLastMessageTimestamp(entityId) - requestor.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) - } - - if (needStarting.nonEmpty) { - rememberAdd(needStarting.keySet) { () => - needStarting.foreach { - case (entityId, requestor) => - getOrCreateEntity(entityId) - sendMsgBuffer(entityId) - requestor.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) - } - } + ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) + case NoState => + log.debug("Request to start entity [{}] and ack to [{}]", entityId, ackTo) + entities.rememberingStart(entityId, ackTo) + rememberUpdate(add = Set(entityId)) + case other => + // FIXME what do we do here? + throw new IllegalStateException(s"Unhandled state when wanting to start $entityId: $other") } } @@ -607,18 +810,11 @@ private[akka] class Shard( if (ack.shardId != shardId && entities.entityIdExists(ack.entityId)) { log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId) - rememberRemove(ack.entityId) { () => - entities.removeEntity(ack.entityId) - messageBuffers.remove(ack.entityId) - } + entities.rememberingStop(ack.entityId, StartedElsewhere) + rememberUpdate(remove = Set(ack.entityId)) } } - private def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { - case Passivate(stopMessage) => passivate(sender(), stopMessage) - case _ => unhandled(msg) - } - private def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match { case HandOff(`shardId`) => handOff(sender()) case HandOff(shard) => log.warning("Shard [{}] can not hand off for another Shard [{}]", shardId, shard) @@ -626,8 +822,11 @@ private[akka] class Shard( } def receiveShardQuery(msg: ShardQuery): Unit = msg match { - case GetCurrentShardState => sender() ! CurrentShardState(shardId, entities.activeEntityIds()) - case GetShardStats => sender() ! ShardStats(shardId, entities.size) + case GetCurrentShardState => + if (VerboseDebug) + log.debug("GetCurrentShardState, full state: [{}], active: [{}]", entities, entities.activeEntityIds()) + sender() ! CurrentShardState(shardId, entities.activeEntityIds()) + case GetShardStats => sender() ! ShardStats(shardId, entities.size) } private def handOff(replyTo: ActorRef): Unit = handOffStopper match { @@ -640,6 +839,7 @@ private[akka] class Shard( if (activeEntities.nonEmpty) { val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds) log.debug("Starting HandOffStopper for shard [{}] to terminate [{}] entities.", shardId, activeEntities.size) + activeEntities.foreach(context.unwatch(_)) handOffStopper = Some( context.watch(context.actorOf( handOffStopperProps(shardId, replyTo, activeEntities, handOffStopMessage, entityHandOffTimeout)))) @@ -660,55 +860,67 @@ private[akka] class Shard( else { // workaround for watchWith not working with stash #29101 entities.entityId(ref) match { - case OptionVal.Some(id) => entityTerminated(ref, id) + case OptionVal.Some(id) => entityTerminated(id) case _ => } } } @InternalStableApi - def entityTerminated(ref: ActorRef, id: EntityId): Unit = { + def entityTerminated(entityId: EntityId): Unit = { import settings.tuningParameters._ - val isPassivating = entities.isPassivating(id) - entities.terminated(ref) if (passivateIdleTask.isDefined) { - lastMessageTimestamp -= id + lastMessageTimestamp -= entityId } - if (messageBuffers.getOrEmpty(id).nonEmpty) { - // Note; because we're not persisting the EntityStopped, we don't need - // to persist the EntityStarted either. - log.debug("Starting entity [{}] again, there are buffered messages for it", id) - flightRecorder.entityPassivateRestart(id) - // this will re-add the entity back - sendMsgBuffer(id) - } else { - if (!isPassivating) { - log.debug("Entity [{}] stopped without passivating, will restart after backoff", id) - entities.waitingForRestart(id) - import context.dispatcher - context.system.scheduler.scheduleOnce(entityRestartBackoff, self, RestartEntity(id)) - } else { - // FIXME optional wait for completion as optimization where stops are not critical - rememberRemove(id)(() => passivateCompleted(id)) - } + entities.entityState(entityId) match { + case RememberingStop(_) => + if (VerboseDebug) + log.debug("Stop of [{}] arrived, already is among the pending stops", entityId) + case Active(_) => + log.debug("Entity [{}] stopped without passivating, will restart after backoff", entityId) + entities.waitingForRestart(entityId) + val msg = RestartTerminatedEntity(entityId) + timers.startSingleTimer(msg, msg, entityRestartBackoff) + + case Passivating(_) => + if (entities.pendingRememberedEntitiesExist()) { + // will go in next batch update + if (VerboseDebug) + log.debug( + "Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops", + entityId) + entities.rememberingStop(entityId, PassivationComplete) + } else { + entities.rememberingStop(entityId, PassivationComplete) + rememberUpdate(remove = Set(entityId)) + } + case unexpected => + val ref = entities.entity(entityId) + log.warning( + "Got a terminated for [{}], entityId [{}] which is in unexpected state [{}]", + ref, + entityId, + unexpected) } } private def passivate(entity: ActorRef, stopMessage: Any): Unit = { entities.entityId(entity) match { case OptionVal.Some(id) => - if (!messageBuffers.contains(id)) { + if (entities.isPassivating(id)) { + log.debug("Passivation already in progress for [{}]. Not sending stopMessage back to entity", id) + } else if (messageBuffers.getOrEmpty(id).nonEmpty) { + log.debug("Passivation when there are buffered messages for [{}], ignoring", id) + // FIXME should we buffer the stop message then? + } else { if (VerboseDebug) - log.debug("Passivation started for {}", entity) - + log.debug("Passivation started for [{}]", id) entities.entityPassivating(id) - messageBuffers.add(id) entity ! stopMessage flightRecorder.entityPassivate(id) - } else { - log.debug("Passivation already in progress for {}. Not sending stopMessage back to entity", entity) } - case OptionVal.None => log.debug("Unknown entity {}. Not sending stopMessage back to entity", entity) + case OptionVal.None => + log.debug("Unknown entity passivating [{}]. Not sending stopMessage back to entity", entity) } } @@ -736,66 +948,75 @@ private[akka] class Shard( entities.removeEntity(entityId) if (hasBufferedMessages) { log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId) - // in case we're already writing to the remember store flightRecorder.entityPassivateRestart(entityId) - self ! StartEntityInternal(entityId) + // trigger start or batch in case we're already writing to the remember store + entities.rememberingStart(entityId, None) + if (!entities.pendingRememberedEntitiesExist()) rememberUpdate(Set(entityId)) } else { log.debug("Entity stopped after passivation [{}]", entityId) - dropBufferFor(entityId) } } - /** - * @param entityIdsWaitingForWrite ids for remembered entities that have a write in progress, if non empty messages for that id - * will be buffered - */ - private def deliverMessage(msg: Any, snd: ActorRef, entityIdsWaitingForWrite: OptionVal[Set[EntityId]]): Unit = { - val (id, payload) = extractEntityId(msg) - if (id == null || id == "") { + private def deliverMessage(msg: Any, snd: ActorRef): Unit = { + val (entityId, payload) = extractEntityId(msg) + if (entityId == null || entityId == "") { log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName) context.system.deadLetters ! msg } else { payload match { case start: ShardRegion.StartEntity => + // Handling StartEntity both here and in the receives allows for sending it both as is and in an envelope + // to be extracted by the entity id extractor. + // we can only start a new entity if we are not currently waiting for another write - if (entityIdsWaitingForWrite.isEmpty) startEntities(Map(start.entityId -> Some(sender()))) - // write in progress, see waitForAsyncWrite for unstash - else stash() + if (entities.pendingRememberedEntitiesExist()) { + if (VerboseDebug) + log.debug("StartEntity({}) from [{}], adding to batch", start.entityId, snd) + entities.rememberingStart(entityId, ackTo = Some(snd)) + } else { + if (VerboseDebug) + log.debug("StartEntity({}) from [{}], starting", start.entityId, snd) + startEntity(start.entityId, Some(sender())) + } case _ => - entities.entityState(id) match { - case OptionVal.Some(Remembering) => - appendToMessageBuffer(id, msg, snd) - case OptionVal.Some(Passivating(_)) => - appendToMessageBuffer(id, msg, snd) - case OptionVal.Some(Active(ref)) => - if (VerboseDebug) log.debug("Delivering message of type [{}] to [{}]", payload.getClass, id) - touchLastMessageTimestamp(id) + entities.entityState(entityId) match { + case Active(ref) => + if (VerboseDebug) + log.debug("Delivering message of type [{}] to [{}]", payload.getClass, entityId) + touchLastMessageTimestamp(entityId) ref.tell(payload, snd) - case OptionVal.Some(RememberedButNotCreated | WaitingForRestart) => + case RememberingStart(_) | RememberingStop(_) | Passivating(_) => + appendToMessageBuffer(entityId, msg, snd) + case state @ (WaitingForRestart | RememberedButNotCreated) => if (VerboseDebug) log.debug( - "Delivering message of type [{}] to [{}] (starting because known but not running)", + "Delivering message of type [{}] to [{}] (starting because [{}])", payload.getClass, - id) - val actor = getOrCreateEntity(id) - touchLastMessageTimestamp(id) + entityId, + state) + val actor = getOrCreateEntity(entityId) + touchLastMessageTimestamp(entityId) actor.tell(payload, snd) - case OptionVal.None if entityIdsWaitingForWrite.isEmpty => - // No actor running and no write in progress, start actor and deliver message when started - if (VerboseDebug) - log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id) - appendToMessageBuffer(id, msg, snd) - rememberAdd(Set(id))(() => sendMsgBuffer(id)) - case OptionVal.None => - // No actor running and write in progress for some other entity id, stash message for deliver when - // unstash happens on async write complete - if (VerboseDebug) - log.debug( - "Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]", - payload.getClass, - id, - entityIdsWaitingForWrite.get) - appendToMessageBuffer(id, msg, snd) + case NoState => + if (entities.pendingRememberedEntitiesExist()) { + // No actor running and write in progress for some other entity id (can only happen with remember entities enabled) + if (VerboseDebug) + log.debug( + "Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]", + payload.getClass, + entityId, + entities.pendingRememberEntities()) + appendToMessageBuffer(entityId, msg, snd) + entities.rememberingStart(entityId, ackTo = None) + } else { + // No actor running and no write in progress, start actor and deliver message when started + if (VerboseDebug) + log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, entityId) + appendToMessageBuffer(entityId, msg, snd) + entities.rememberingStart(entityId, ackTo = None) + rememberUpdate(add = Set(entityId)) + } + } } } @@ -818,7 +1039,7 @@ private[akka] class Shard( // ===== buffering while busy saving a start or stop when remembering entities ===== def appendToMessageBuffer(id: EntityId, msg: Any, snd: ActorRef): Unit = { - if (messageBuffers.totalSize >= bufferSize) { + if (messageBuffers.totalSize >= settings.tuningParameters.bufferSize) { if (log.isDebugEnabled) log.debug("Buffer is full, dropping message of type [{}] for entity [{}]", msg.getClass.getName, id) context.system.deadLetters ! msg @@ -838,10 +1059,10 @@ private[akka] class Shard( if (messages.nonEmpty) { getOrCreateEntity(entityId) log.debug("Sending message buffer for entity [{}] ([{}] messages)", entityId, messages.size) - //Now there is no deliveryBuffer we can try to redeliver + // Now there is no deliveryBuffer we can try to redeliver // and as the child exists, the message will be directly forwarded messages.foreach { - case (msg, snd) => deliverMessage(msg, snd, OptionVal.None) + case (msg, snd) => deliverMessage(msg, snd) } touchLastMessageTimestamp(entityId) } @@ -862,5 +1083,18 @@ private[akka] class Shard( override def postStop(): Unit = { passivateIdleTask.foreach(_.cancel()) + log.debug("Shard [{}] shutting down", shardId) + } + + private def rememberedEntitiesRecoveryStrategy: EntityRecoveryStrategy = { + import settings.tuningParameters._ + entityRecoveryStrategy match { + case "all" => EntityRecoveryStrategy.allStrategy() + case "constant" => + EntityRecoveryStrategy.constantStrategy( + context.system, + entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities) + } } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala index 8d7f268200..2b2bdfbf1a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala @@ -29,7 +29,6 @@ import akka.cluster.ddata.SelfUniqueAddress import akka.cluster.sharding.ClusterShardingSettings import akka.cluster.sharding.ShardRegion.EntityId import akka.cluster.sharding.ShardRegion.ShardId -import akka.cluster.sharding.internal.RememberEntitiesShardStore.{ AddEntities, RemoveEntity } import akka.util.PrettyDuration._ import scala.concurrent.ExecutionContext @@ -59,6 +58,12 @@ private[akka] object DDataRememberEntitiesShardStore { private def stateKeys(typeName: String, shardId: ShardId): Array[ORSetKey[EntityId]] = Array.tabulate(numberOfKeys)(i => ORSetKey[EntityId](s"shard-$typeName-$shardId-$i")) + private sealed trait Evt { + def id: EntityId + } + private case class Started(id: EntityId) extends Evt + private case class Stopped(id: EntityId) extends Evt + } /** @@ -104,9 +109,8 @@ private[akka] final class DDataRememberEntitiesShardStore( override def receive: Receive = idle def idle: Receive = { - case RememberEntitiesShardStore.GetEntities => onGetEntities() - case AddEntities(ids) => addEntities(ids) - case RemoveEntity(id) => removeEntity(id) + case RememberEntitiesShardStore.GetEntities => onGetEntities() + case update: RememberEntitiesShardStore.Update => onUpdate(update) } def waitingForAllEntityIds(requestor: ActorRef, gotKeys: Set[Int], ids: Set[EntityId]): Receive = { @@ -138,78 +142,82 @@ private[akka] final class DDataRememberEntitiesShardStore( case GetDataDeleted(_, _) => log.error("Unable to get an initial state because it was deleted") context.stop(self) + case update: RememberEntitiesShardStore.Update => + log.warning("Got an update before load of initial entities completed, dropping update: [{}]", update) } } - private def addEntities(ids: Set[EntityId]): Unit = { - val updates: Map[Set[EntityId], (Update[ORSet[EntityId]], Int)] = ids.groupBy(key).map { - case (key, ids) => - (ids, (Update(key, ORSet.empty[EntityId], writeMajority, Some(ids)) { existing => - ids.foldLeft(existing) { - case (acc, nextId) => acc :+ nextId - } - }, maxUpdateAttempts)) - } + private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = { + // FIXME what about ordering of adds/removes vs sets, I think we can lose one + val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped)) + // map from set of evts (for same ddata key) to one update that applies each of them + val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] = + allEvts.groupBy(evt => key(evt.id)).map { + case (key, evts) => + (evts, (Update(key, ORSet.empty[EntityId], writeMajority, Some(evts)) { existing => + evts.foldLeft(existing) { + case (acc, Started(id)) => acc :+ id + case (acc, Stopped(id)) => acc.remove(id) + } + }, maxUpdateAttempts)) + } - updates.foreach { + ddataUpdates.foreach { case (_, (update, _)) => replicator ! update } - context.become(waitingForUpdates(sender(), ids, updates)) - } - - private def removeEntity(id: EntityId): Unit = { - val keyForEntity = key(id) - val update = Update(keyForEntity, ORSet.empty[EntityId], writeMajority, Some(Set(id))) { existing => - existing.remove(id) - } - replicator ! update - - context.become(waitingForUpdates(sender(), Set(id), Map((Set(id), (update, maxUpdateAttempts))))) + context.become(waitingForUpdates(sender(), update, ddataUpdates)) } private def waitingForUpdates( requestor: ActorRef, - allIds: Set[EntityId], - updates: Map[Set[EntityId], (Update[ORSet[EntityId]], Int)]): Receive = { - case UpdateSuccess(_, Some(ids: Set[EntityId] @unchecked)) => - if (log.isDebugEnabled) - log.debug("The DDataShard state was successfully updated for [{}]", ids.mkString(", ")) - val remaining = updates - ids - if (remaining.isEmpty) { - requestor ! RememberEntitiesShardStore.UpdateDone(allIds) - context.become(idle) - } else { - context.become(waitingForUpdates(requestor, allIds, remaining)) - } + update: RememberEntitiesShardStore.Update, + allUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)]): Receive = { - case UpdateTimeout(_, Some(ids: Set[EntityId] @unchecked)) => - val (update, retriesLeft) = updates(ids) - if (retriesLeft > 0) { - log.debug("Retrying update because of write timeout, tries left [{}]", retriesLeft) - replicator ! update - context.become(waitingForUpdates(requestor, allIds, updates.updated(ids, (update, retriesLeft - 1)))) - } else { - log.error( - "Unable to update state, within 'updating-state-timeout'= [{}], gave up after [{}] retries", - writeMajority.timeout.pretty, - maxUpdateAttempts) + // updatesLeft used both to keep track of what work remains and for retrying on timeout up to a limit + def next(updatesLeft: Map[Set[Evt], (Update[ORSet[EntityId]], Int)]): Receive = { + case UpdateSuccess(_, Some(evts: Set[Evt] @unchecked)) => + log.debug("The DDataShard state was successfully updated for [{}]", evts) + val remainingAfterThis = updatesLeft - evts + if (remainingAfterThis.isEmpty) { + requestor ! RememberEntitiesShardStore.UpdateDone(update.started, update.stopped) + context.become(idle) + } else { + context.become(next(remainingAfterThis)) + } + + case UpdateTimeout(_, Some(evts: Set[Evt] @unchecked)) => + val (updateForEvts, retriesLeft) = updatesLeft(evts) + if (retriesLeft > 0) { + log.debug("Retrying update because of write timeout, tries left [{}]", retriesLeft) + replicator ! updateForEvts + context.become(next(updatesLeft.updated(evts, (updateForEvts, retriesLeft - 1)))) + } else { + log.error( + "Unable to update state, within 'updating-state-timeout'= [{}], gave up after [{}] retries", + writeMajority.timeout.pretty, + maxUpdateAttempts) + // will trigger shard restart + context.stop(self) + } + case StoreFailure(_, _) => + log.error("Unable to update state, due to store failure") // will trigger shard restart context.stop(self) - } - case StoreFailure(_, _) => - log.error("Unable to update state, due to store failure") - // will trigger shard restart - context.stop(self) - case ModifyFailure(_, error, cause, _) => - log.error(cause, "Unable to update state, due to modify failure: {}", error) - // will trigger shard restart - context.stop(self) - case UpdateDataDeleted(_, _) => - log.error("Unable to update state, due to delete") - // will trigger shard restart - context.stop(self) + case ModifyFailure(_, error, cause, _) => + log.error(cause, "Unable to update state, due to modify failure: {}", error) + // will trigger shard restart + context.stop(self) + case UpdateDataDeleted(_, _) => + log.error("Unable to update state, due to delete") + // will trigger shard restart + context.stop(self) + case update: RememberEntitiesShardStore.Update => + log.warning("Got a new update before write of previous completed, dropping update: [{}]", update) + } + + next(allUpdates) } private def onGetEntities(): Unit = { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala index dabec1e5fd..682ce661d1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala @@ -60,14 +60,14 @@ private[akka] object EventSourcedRememberEntitiesStore { /** * `State` change for starting a set of entities in this `Shard` */ - final case class EntitiesStarted(entities: Set[String]) extends StateChange + final case class EntitiesStarted(entities: Set[EntityId]) extends StateChange case object StartedAck /** * `State` change for an entity which has terminated. */ - final case class EntityStopped(entityId: EntityId) extends StateChange + final case class EntitiesStopped(entities: Set[EntityId]) extends StateChange def props(typeName: String, shardId: ShardRegion.ShardId, settings: ClusterShardingSettings): Props = Props(new EventSourcedRememberEntitiesStore(typeName, shardId, settings)) @@ -98,25 +98,28 @@ private[akka] final class EventSourcedRememberEntitiesStore( override def receiveRecover: Receive = { case EntitiesStarted(ids) => state = state.copy(state.entities.union(ids)) - case EntityStopped(id) => state = state.copy(state.entities - id) + case EntitiesStopped(ids) => state = state.copy(state.entities -- ids) case SnapshotOffer(_, snapshot: State) => state = snapshot case RecoveryCompleted => log.debug("Recovery completed for shard [{}] with [{}] entities", shardId, state.entities.size) } override def receiveCommand: Receive = { - case RememberEntitiesShardStore.AddEntities(ids) => - persist(EntitiesStarted(ids)) { started => - sender() ! RememberEntitiesShardStore.UpdateDone(ids) - state.copy(state.entities ++ started.entities) - saveSnapshotWhenNeeded() - } - case RememberEntitiesShardStore.RemoveEntity(id) => - persist(EntityStopped(id)) { stopped => - sender() ! RememberEntitiesShardStore.UpdateDone(Set(id)) - state.copy(state.entities - stopped.entityId) - saveSnapshotWhenNeeded() + + case RememberEntitiesShardStore.Update(started, stopped) => + val events = + (if (started.nonEmpty) EntitiesStarted(started) :: Nil else Nil) ::: + (if (stopped.nonEmpty) EntitiesStopped(stopped) :: Nil else Nil) + var left = events.size + persistAll(events) { _ => + left -= 1 + if (left == 0) { + sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped) + state.copy(state.entities.union(started) -- stopped) + saveSnapshotWhenNeeded() + } } + case RememberEntitiesShardStore.GetEntities => sender() ! RememberEntitiesShardStore.RememberedEntities(state.entities) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntitiesStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntitiesStore.scala index eff58885d9..ef1cb99672 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntitiesStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/RememberEntitiesStore.scala @@ -46,10 +46,10 @@ private[akka] object RememberEntitiesShardStore { // SPI protocol for a remember entities shard store sealed trait Command - final case class AddEntities(entityIds: Set[EntityId]) extends Command - final case class RemoveEntity(entityId: EntityId) extends Command - // responses for UpdateEntity add and remove - final case class UpdateDone(entityIds: Set[EntityId]) + // Note: the store is not expected to receive and handle new update before it has responded to the previous one + final case class Update(started: Set[EntityId], stopped: Set[EntityId]) extends Command + // responses for Update + final case class UpdateDone(started: Set[EntityId], stopped: Set[EntityId]) case object GetEntities extends Command final case class RememberedEntities(entities: Set[EntityId]) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index a094cbdc7f..6bb328e5d7 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -22,6 +22,7 @@ import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardRegion._ import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm } import akka.cluster.sharding.internal.EventSourcedRememberShards.{ MigrationMarker, State => RememberShardsState } +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ State => EntityState } import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages import akka.protobufv3.internal.MessageLite import akka.serialization.BaseSerializer @@ -32,7 +33,7 @@ import java.io.NotSerializableException import akka.actor.Address import akka.cluster.sharding.ShardRegion._ -import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.EntitiesStarted +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ EntitiesStarted, EntitiesStopped } import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages /** @@ -44,7 +45,6 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy with BaseSerializer { import Shard.{ CurrentShardState, GetCurrentShardState } import Shard.{ GetShardStats, ShardStats } - import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ State => EntityState, EntityStopped } import ShardCoordinator.Internal._ private final val BufferSize = 1024 * 4 @@ -74,6 +74,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private val EntityStartedManifest = "CB" private val EntityStoppedManifest = "CD" private val EntitiesStartedManifest = "CE" + private val EntitiesStoppedManifest = "CF" private val StartEntityManifest = "EA" private val StartEntityAckManifest = "EB" @@ -101,6 +102,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy EntityStartedManifest -> entityStartedFromBinary, EntitiesStartedManifest -> entitiesStartedFromBinary, EntityStoppedManifest -> entityStoppedFromBinary, + EntitiesStoppedManifest -> entitiesStoppedFromBinary, CoordinatorStateManifest -> coordinatorStateFromBinary, ShardRegionRegisteredManifest -> { bytes => ShardRegionRegistered(actorRefMessageFromBinary(bytes)) @@ -203,7 +205,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy override def manifest(obj: AnyRef): String = obj match { case _: EntityState => EntityStateManifest case _: EntitiesStarted => EntitiesStartedManifest - case _: EntityStopped => EntityStoppedManifest + case _: EntitiesStopped => EntitiesStoppedManifest case _: State => CoordinatorStateManifest case _: ShardRegionRegistered => ShardRegionRegisteredManifest @@ -276,7 +278,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy case m: EntityState => entityStateToProto(m).toByteArray case m: EntitiesStarted => entitiesStartedToProto(m).toByteArray - case m: EntityStopped => entityStoppedToProto(m).toByteArray + case m: EntitiesStopped => entitiesStoppedToProto(m).toByteArray case s: StartEntity => startEntityToByteArray(s) case s: StartEntityAck => startEntityAckToByteArray(s) @@ -417,11 +419,14 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private def entitiesStartedFromBinary(bytes: Array[Byte]): EntitiesStarted = EntitiesStarted(sm.EntitiesStarted.parseFrom(bytes).getEntityIdList.asScala.toSet) - private def entityStoppedToProto(evt: EntityStopped): sm.EntityStopped = - sm.EntityStopped.newBuilder().setEntityId(evt.entityId).build() + private def entitiesStoppedToProto(evt: EntitiesStopped): sm.EntitiesStopped = + sm.EntitiesStopped.newBuilder().addAllEntityId(evt.entities.asJava).build() - private def entityStoppedFromBinary(bytes: Array[Byte]): EntityStopped = - EntityStopped(sm.EntityStopped.parseFrom(bytes).getEntityId) + private def entityStoppedFromBinary(bytes: Array[Byte]): EntitiesStopped = + EntitiesStopped(Set(sm.EntityStopped.parseFrom(bytes).getEntityId)) + + private def entitiesStoppedFromBinary(bytes: Array[Byte]): EntitiesStopped = + EntitiesStopped(sm.EntitiesStopped.parseFrom(bytes).getEntityIdList.asScala.toSet) private def shardStatsToProto(evt: ShardStats): sm.ShardStats = sm.ShardStats.newBuilder().setShard(evt.shardId).setEntityCount(evt.entityCount).build() diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala deleted file mode 100644 index 8b13789179..0000000000 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala +++ /dev/null @@ -1 +0,0 @@ - diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntitiesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntitiesSpec.scala index e282f134eb..ec6fff8306 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntitiesSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntitiesSpec.scala @@ -5,7 +5,15 @@ package akka.cluster.sharding import akka.actor.ActorRef import akka.cluster.sharding -import akka.cluster.sharding.Shard.{ Active, Passivating, RememberedButNotCreated, Remembering, Stopped } +import akka.cluster.sharding.Shard.{ + Active, + NoState, + Passivating, + PassivationComplete, + RememberedButNotCreated, + RememberingStart, + RememberingStop +} import akka.event.NoLogging import akka.util.OptionVal import org.scalatest.matchers.should.Matchers @@ -15,62 +23,81 @@ class EntitiesSpec extends AnyWordSpec with Matchers { "Entities" should { "start empty" in { - val entities = new sharding.Shard.Entities(NoLogging) + val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = false) entities.activeEntityIds() shouldEqual Set.empty entities.size shouldEqual 0 entities.activeEntities() shouldEqual Set.empty } "set already remembered entities to state RememberedButNotStarted" in { - val entities = new sharding.Shard.Entities(NoLogging) + val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true) val ids = Set("a", "b", "c") entities.alreadyRemembered(ids) entities.activeEntities() shouldEqual Set.empty entities.size shouldEqual 3 ids.foreach { id => - entities.entityState(id) shouldEqual OptionVal.Some(RememberedButNotCreated) + entities.entityState(id) shouldEqual RememberedButNotCreated } } - "set state to terminating" in { - val entities = new sharding.Shard.Entities(NoLogging) - val ref = ActorRef.noSender - entities.addEntity("a", ref) - entities.terminated(ref) - entities.entityState("a") shouldEqual OptionVal.Some(Stopped) - entities.activeEntities() shouldEqual Set.empty + "set state to remembering start" in { + val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true) + entities.rememberingStart("a", None) + entities.entityState("a") shouldEqual RememberingStart(None) + entities.pendingRememberedEntitiesExist() should ===(true) + val (starts, stops) = entities.pendingRememberEntities() + starts.keySet should contain("a") + stops should be(empty) + + // also verify removal from pending once it starts + entities.addEntity("a", ActorRef.noSender) + entities.pendingRememberedEntitiesExist() should ===(false) + entities.pendingRememberEntities()._1 should be(empty) + } + "set state to remembering stop" in { + val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true) + entities.addEntity("a", ActorRef.noSender) // need to go through active to passivate + entities.entityPassivating("a") // need to go through passivate to remember stop + entities.rememberingStop("a", PassivationComplete) + entities.entityState("a") shouldEqual RememberingStop(PassivationComplete) + entities.pendingRememberedEntitiesExist() should ===(true) + val (starts, stops) = entities.pendingRememberEntities() + stops should contain("a") + starts should be(empty) + + // also verify removal from pending once it stops + entities.removeEntity("a") + entities.pendingRememberedEntitiesExist() should ===(false) + entities.pendingRememberEntities()._2 should be(empty) } - "set state to remembering" in { - val entities = new sharding.Shard.Entities(NoLogging) - entities.remembering("a") - entities.entityState("a") shouldEqual OptionVal.Some(Remembering) - } "fully remove an entity" in { - val entities = new sharding.Shard.Entities(NoLogging) + val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true) val ref = ActorRef.noSender entities.addEntity("a", ref) + entities.entityPassivating("a") // needs to go through passivating to be removed entities.removeEntity("a") - entities.entityState("a") shouldEqual OptionVal.None - entities.activeEntities() shouldEqual Set.empty + entities.entityState("a") shouldEqual NoState + entities.activeEntities() should be(empty) + entities.activeEntityIds() should be(empty) } "add an entity as active" in { - val entities = new sharding.Shard.Entities(NoLogging) + val entities = new sharding.Shard.Entities(NoLogging, false) val ref = ActorRef.noSender entities.addEntity("a", ref) - entities.entityState("a") shouldEqual OptionVal.Some(Active(ref)) + entities.entityState("a") shouldEqual Active(ref) } "look up actor ref by id" in { - val entities = new sharding.Shard.Entities(NoLogging) + val entities = new sharding.Shard.Entities(NoLogging, false) val ref = ActorRef.noSender entities.addEntity("a", ref) entities.entityId(ref) shouldEqual OptionVal.Some("a") } "set state to passivating" in { - val entities = new sharding.Shard.Entities(NoLogging) + val entities = new sharding.Shard.Entities(NoLogging, false) val ref = ActorRef.noSender entities.addEntity("a", ref) entities.entityPassivating("a") - entities.entityState("a") shouldEqual OptionVal.Some(Passivating(ref)) + entities.entityState("a") shouldEqual Passivating(ref) } } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesBatchedUpdatesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesBatchedUpdatesSpec.scala new file mode 100644 index 0000000000..0fbbb39388 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesBatchedUpdatesSpec.scala @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props } +import akka.cluster.{ Cluster, MemberStatus } +import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +object RememberEntitiesBatchedUpdatesSpec { + + case class EntityEnvelope(id: Int, msg: Any) + + object EntityActor { + case class Started(id: Int) + case class Stopped(id: Int) + def props(probe: ActorRef) = Props(new EntityActor(probe)) + } + class EntityActor(probe: ActorRef) extends Actor with ActorLogging { + import EntityActor._ + probe ! Started(self.path.name.toInt) + override def receive: Receive = { + case "stop" => + log.debug("Got stop message, stopping") + context.stop(self) + case "graceful-stop" => + log.debug("Got a graceful stop, requesting passivation") + context.parent ! ShardRegion.Passivate("stop") + case "start" => + log.debug("Got a start") + case "ping" => + } + + override def postStop(): Unit = { + probe ! Stopped(self.path.name.toInt) + } + } + + def config = ConfigFactory.parseString(""" + akka.loglevel=DEBUG + # akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.actor.provider = cluster + akka.remote.artery.canonical.port = 0 + akka.remote.classic.netty.tcp.port = 0 + akka.cluster.sharding.state-store-mode = ddata + akka.cluster.sharding.remember-entities = on + # no leaks between test runs thank you + akka.cluster.sharding.distributed-data.durable.keys = [] + """.stripMargin) +} +class RememberEntitiesBatchedUpdatesSpec + extends AkkaSpec(RememberEntitiesBatchedUpdatesSpec.config) + with AnyWordSpecLike + with ImplicitSender { + + import RememberEntitiesBatchedUpdatesSpec._ + + val extractEntityId: ShardRegion.ExtractEntityId = { + case EntityEnvelope(id, payload) => (id.toString, payload) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case EntityEnvelope(_, _) => "1" // single shard for all entities + case ShardRegion.StartEntity(_) => "1" + } + + override def atStartup(): Unit = { + // Form a one node cluster + val cluster = Cluster(system) + cluster.join(cluster.selfAddress) + awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1)) + } + + "Batching of starts and stops" must { + + "work" in { + val probe = TestProbe() + val sharding = ClusterSharding(system).start( + "batching", + EntityActor.props(probe.ref), + ClusterShardingSettings(system), + extractEntityId, + extractShardId) + + // make sure that sharding is up and running + sharding.tell(EntityEnvelope(0, "ping"), probe.ref) + probe.expectMsg(EntityActor.Started(0)) + + // start 20, should write first and batch the rest + (1 to 20).foreach { i => + sharding ! EntityEnvelope(i, "start") + } + probe.receiveN(20) + + // start 20 more, and stop the previous ones that are already running, + // should create a mixed batch of start + stops + (21 to 40).foreach { i => + sharding ! EntityEnvelope(i, "start") + sharding ! EntityEnvelope(i - 20, "graceful-stop") + } + probe.receiveN(40) + // stop the last 20, should batch stops only + (21 to 40).foreach { i => + sharding ! EntityEnvelope(i, "graceful-stop") + } + probe.receiveN(20) + } + + } + +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala index 65380547f3..2fbd6e0ffa 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala @@ -8,7 +8,6 @@ import akka.Done import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Timers } import akka.cluster.Cluster import akka.cluster.MemberStatus -import akka.cluster.sharding.ShardRegion.EntityId import akka.cluster.sharding.ShardRegion.ShardId import akka.cluster.sharding.internal.RememberEntitiesCoordinatorStore import akka.cluster.sharding.internal.RememberEntitiesShardStore @@ -44,6 +43,7 @@ object RememberEntitiesFailureSpec { """) class EntityActor extends Actor with ActorLogging { + log.info("Entity actor [{}] starting up", context.self.path.name) override def receive: Receive = { case "stop" => log.info("Stopping myself!") @@ -87,10 +87,8 @@ object RememberEntitiesFailureSpec { object FakeShardStoreActor { def props(shardId: ShardId): Props = Props(new FakeShardStoreActor(shardId)) - case class FailAddEntity(entityId: Set[EntityId], whichWay: Fail) - case class FailRemoveEntity(entityId: EntityId, whichWay: Fail) - case class ClearAddFail(entityId: Set[EntityId]) - case class ClearRemoveFail(entityId: EntityId) + case class FailUpdateEntity(whichWay: Fail) + case object ClearFail case class Delayed(replyTo: ActorRef, msg: Any) } @@ -98,8 +96,7 @@ object RememberEntitiesFailureSpec { import FakeShardStoreActor._ implicit val ec = context.system.dispatcher - private var failAddEntity = Map.empty[Set[EntityId], Fail] - private var failRemoveEntity = Map.empty[EntityId, Fail] + private var failUpdate: Option[Fail] = None context.system.eventStream.publish(ShardStoreCreated(self, shardId)) @@ -114,9 +111,9 @@ object RememberEntitiesFailureSpec { log.debug("Delaying initial entities listing with {}", howLong) timers.startSingleTimer("get-entities-delay", Delayed(sender(), Set.empty), howLong) } - case RememberEntitiesShardStore.AddEntities(entityId) => - failAddEntity.get(entityId) match { - case None => sender ! RememberEntitiesShardStore.UpdateDone(entityId) + case RememberEntitiesShardStore.Update(started, stopped) => + failUpdate match { + case None => sender ! RememberEntitiesShardStore.UpdateDone(started, stopped) case Some(NoResponse) => log.debug("Sending no response for AddEntity") case Some(CrashStore) => throw TestException("store crash on AddEntity") case Some(StopStore) => context.stop(self) @@ -124,27 +121,11 @@ object RememberEntitiesFailureSpec { log.debug("Delaying response for AddEntity with {}", howLong) timers.startSingleTimer("add-entity-delay", Delayed(sender(), Set.empty), howLong) } - case RememberEntitiesShardStore.RemoveEntity(entityId) => - failRemoveEntity.get(entityId) match { - case None => sender ! RememberEntitiesShardStore.UpdateDone(Set(entityId)) - case Some(NoResponse) => log.debug("Sending no response for RemoveEntity") - case Some(CrashStore) => throw TestException("store crash on AddEntity") - case Some(StopStore) => context.stop(self) - case Some(Delay(howLong)) => - log.debug("Delaying response for RemoveEntity with {}", howLong) - timers.startSingleTimer("remove-entity-delay", Delayed(sender(), Set.empty), howLong) - } - case FailAddEntity(id, whichWay) => - failAddEntity = failAddEntity.updated(id, whichWay) + case FailUpdateEntity(whichWay) => + failUpdate = Some(whichWay) sender() ! Done - case FailRemoveEntity(id, whichWay) => - failRemoveEntity = failRemoveEntity.updated(id, whichWay) - sender() ! Done - case ClearAddFail(id) => - failAddEntity = failAddEntity - id - sender() ! Done - case ClearRemoveFail(id) => - failRemoveEntity = failRemoveEntity - id + case ClearFail => + failUpdate = None sender() ! Done case Delayed(to, msg) => to ! msg @@ -217,7 +198,7 @@ class RememberEntitiesFailureSpec "Remember entities handling in sharding" must { - List(NoResponse, CrashStore, StopStore, Delay(1.second), Delay(2.seconds)).foreach { wayToFail: Fail => + List(NoResponse, CrashStore, StopStore, Delay(500.millis), Delay(1.second)).foreach { wayToFail: Fail => s"recover when initial remember entities load fails $wayToFail" in { log.debug("Getting entities for shard 1 will fail") failShardGetEntities = Map("1" -> wayToFail) @@ -267,7 +248,7 @@ class RememberEntitiesFailureSpec probe.expectMsg("hello-1") // hit shard with other entity that will fail - shardStore.tell(FakeShardStoreActor.FailAddEntity(Set("11"), wayToFail), storeProbe.ref) + shardStore.tell(FakeShardStoreActor.FailUpdateEntity(wayToFail), storeProbe.ref) storeProbe.expectMsg(Done) sharding.tell(EntityEnvelope(11, "hello-11"), probe.ref) @@ -280,7 +261,7 @@ class RememberEntitiesFailureSpec } val stopFailingProbe = TestProbe() - shardStore.tell(FakeShardStoreActor.ClearAddFail(Set("11")), stopFailingProbe.ref) + shardStore.tell(FakeShardStoreActor.ClearFail, stopFailingProbe.ref) stopFailingProbe.expectMsg(Done) // it takes a while - timeout hits and then backoff @@ -310,13 +291,13 @@ class RememberEntitiesFailureSpec probe.expectMsg("hello-1") // fail it when stopping - shard1Store.tell(FakeShardStoreActor.FailRemoveEntity("1", wayToFail), storeProbe.ref) + shard1Store.tell(FakeShardStoreActor.FailUpdateEntity(wayToFail), storeProbe.ref) storeProbe.expectMsg(Done) // FIXME restart without passivating is not saved and re-started again without storing the stop so this isn't testing anything sharding ! EntityEnvelope(1, "stop") - shard1Store.tell(FakeShardStoreActor.ClearRemoveFail("1"), storeProbe.ref) + shard1Store.tell(FakeShardStoreActor.ClearFail, storeProbe.ref) storeProbe.expectMsg(Done) // it takes a while - timeout hits and then backoff @@ -348,13 +329,17 @@ class RememberEntitiesFailureSpec probe.expectMsg("hello-1") // fail it when stopping - shard1Store.tell(FakeShardStoreActor.FailRemoveEntity("1", wayToFail), storeProbe.ref) + shard1Store.tell(FakeShardStoreActor.FailUpdateEntity(wayToFail), storeProbe.ref) storeProbe.expectMsg(Done) sharding ! EntityEnvelope(1, "graceful-stop") - shard1Store.tell(FakeShardStoreActor.ClearRemoveFail("1"), storeProbe.ref) - storeProbe.expectMsg(Done) + if (wayToFail != CrashStore && wayToFail != StopStore) { + // race, give the shard some time to see the passivation before restoring the fake shard store + Thread.sleep(250) + shard1Store.tell(FakeShardStoreActor.ClearFail, probe.ref) + probe.expectMsg(Done) + } // it takes a while? awaitAssert({ diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala new file mode 100644 index 0000000000..0b8316854e --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.internal + +import akka.cluster.ddata.{ Replicator, ReplicatorSettings } +import akka.cluster.sharding.ClusterShardingSettings +import akka.cluster.{ Cluster, MemberStatus } +import akka.testkit.{ AkkaSpec, ImplicitSender, WithLogCapturing } +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +object DDataRememberEntitiesShardStoreSpec { + def config = ConfigFactory.parseString(""" + akka.loglevel=DEBUG + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.actor.provider = cluster + akka.remote.artery.canonical.port = 0 + akka.remote.classic.netty.tcp.port = 0 + akka.cluster.sharding.state-store-mode = ddata + akka.cluster.sharding.remember-entities = on + # no leaks between test runs thank you + akka.cluster.sharding.distributed-data.durable.keys = [] + """.stripMargin) +} + +// FIXME generalize to general test and cover both ddata and eventsourced +class DDataRememberEntitiesShardStoreSpec + extends AkkaSpec(DDataRememberEntitiesShardStoreSpec.config) + with AnyWordSpecLike + with ImplicitSender + with WithLogCapturing { + + override def atStartup(): Unit = { + // Form a one node cluster + val cluster = Cluster(system) + cluster.join(cluster.selfAddress) + awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1)) + } + + "The DDataRememberEntitiesShardStore" must { + + "store starts and stops and list remembered entity ids" in { + val replicatorSettings = ReplicatorSettings(system) + val replicator = system.actorOf(Replicator.props(replicatorSettings)) + + val shardingSettings = ClusterShardingSettings(system) + val store = system.actorOf( + DDataRememberEntitiesShardStore + .props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1)) + + store ! RememberEntitiesShardStore.GetEntities + expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should be(empty) + + store ! RememberEntitiesShardStore.Update(Set("1", "2", "3"), Set.empty) + expectMsg(RememberEntitiesShardStore.UpdateDone(Set("1", "2", "3"), Set.empty)) + + store ! RememberEntitiesShardStore.Update(Set("4", "5", "6"), Set("2", "3")) + expectMsg(RememberEntitiesShardStore.UpdateDone(Set("4", "5", "6"), Set("2", "3"))) + + store ! RememberEntitiesShardStore.Update(Set.empty, Set("6")) + expectMsg(RememberEntitiesShardStore.UpdateDone(Set.empty, Set("6"))) + + store ! RememberEntitiesShardStore.Update(Set("2"), Set.empty) + expectMsg(RememberEntitiesShardStore.UpdateDone(Set("2"), Set.empty)) + + store ! RememberEntitiesShardStore.GetEntities + expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5")) + + } + + } + +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala index fa0d6a42b1..ad61a5fe82 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala @@ -76,7 +76,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec { "be able to serialize PersistentShard domain events" in { checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStarted(Set("e1", "e2"))) - checkSerialization(EventSourcedRememberEntitiesStore.EntityStopped("e1")) + checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStopped(Set("e1", "e2"))) } "be able to deserialize old entity started event into entities started" in {