Batch remember entity stops (#29149)

This commit is contained in:
Johan Andrén 2020-06-01 11:03:03 +02:00 committed by GitHub
parent 0d1237fd44
commit b9667fb6b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1572 additions and 450 deletions

View file

@ -228,8 +228,9 @@ abstract class ClusterShardingRememberEntitiesPerfSpec
} }
awaitAssert({ awaitAssert({
region ! GetShardRegionState val probe = TestProbe()
val stats = expectMsgType[CurrentShardRegionState] region.tell(GetShardRegionState, probe.ref)
val stats = probe.expectMsgType[CurrentShardRegionState]
stats.shards.head.shardId shouldEqual "0" stats.shards.head.shardId shouldEqual "0"
stats.shards.head.entityIds.toList.sorted shouldEqual List("0") // the init entity stats.shards.head.entityIds.toList.sorted shouldEqual List("0") // the init entity
}, 2.seconds) }, 2.seconds)

View file

@ -5732,6 +5732,10 @@ public final class ClusterShardingMessages {
getEntityIdBytes(); getEntityIdBytes();
} }
/** /**
* <pre>
* not written anymore but kept for backwards compatibility
* </pre>
*
* Protobuf type {@code EntityStarted} * Protobuf type {@code EntityStarted}
*/ */
public static final class EntityStarted extends public static final class EntityStarted extends
@ -6026,6 +6030,10 @@ public final class ClusterShardingMessages {
return builder; return builder;
} }
/** /**
* <pre>
* not written anymore but kept for backwards compatibility
* </pre>
*
* Protobuf type {@code EntityStarted} * Protobuf type {@code EntityStarted}
*/ */
public static final class Builder extends public static final class Builder extends
@ -6979,6 +6987,10 @@ public final class ClusterShardingMessages {
getEntityIdBytes(); getEntityIdBytes();
} }
/** /**
* <pre>
* not written anymore but kept for backwards compatibility
* </pre>
*
* Protobuf type {@code EntityStopped} * Protobuf type {@code EntityStopped}
*/ */
public static final class EntityStopped extends public static final class EntityStopped extends
@ -7273,6 +7285,10 @@ public final class ClusterShardingMessages {
return builder; return builder;
} }
/** /**
* <pre>
* not written anymore but kept for backwards compatibility
* </pre>
*
* Protobuf type {@code EntityStopped} * Protobuf type {@code EntityStopped}
*/ */
public static final class Builder extends public static final class Builder extends
@ -7567,6 +7583,643 @@ public final class ClusterShardingMessages {
} }
public interface EntitiesStoppedOrBuilder extends
// @@protoc_insertion_point(interface_extends:EntitiesStopped)
akka.protobufv3.internal.MessageOrBuilder {
/**
* <code>repeated string entityId = 1;</code>
* @return A list containing the entityId.
*/
java.util.List<java.lang.String>
getEntityIdList();
/**
* <code>repeated string entityId = 1;</code>
* @return The count of entityId.
*/
int getEntityIdCount();
/**
* <code>repeated string entityId = 1;</code>
* @param index The index of the element to return.
* @return The entityId at the given index.
*/
java.lang.String getEntityId(int index);
/**
* <code>repeated string entityId = 1;</code>
* @param index The index of the value to return.
* @return The bytes of the entityId at the given index.
*/
akka.protobufv3.internal.ByteString
getEntityIdBytes(int index);
}
/**
* Protobuf type {@code EntitiesStopped}
*/
public static final class EntitiesStopped extends
akka.protobufv3.internal.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:EntitiesStopped)
EntitiesStoppedOrBuilder {
private static final long serialVersionUID = 0L;
// Use EntitiesStopped.newBuilder() to construct.
private EntitiesStopped(akka.protobufv3.internal.GeneratedMessageV3.Builder<?> builder) {
super(builder);
}
private EntitiesStopped() {
entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY;
}
@java.lang.Override
@SuppressWarnings({"unused"})
protected java.lang.Object newInstance(
akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter unused) {
return new EntitiesStopped();
}
@java.lang.Override
public final akka.protobufv3.internal.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private EntitiesStopped(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
this();
if (extensionRegistry == null) {
throw new java.lang.NullPointerException();
}
int mutable_bitField0_ = 0;
akka.protobufv3.internal.UnknownFieldSet.Builder unknownFields =
akka.protobufv3.internal.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
case 10: {
akka.protobufv3.internal.ByteString bs = input.readBytes();
if (!((mutable_bitField0_ & 0x00000001) != 0)) {
entityId_ = new akka.protobufv3.internal.LazyStringArrayList();
mutable_bitField0_ |= 0x00000001;
}
entityId_.add(bs);
break;
}
default: {
if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) {
done = true;
}
break;
}
}
}
} catch (akka.protobufv3.internal.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new akka.protobufv3.internal.InvalidProtocolBufferException(
e).setUnfinishedMessage(this);
} finally {
if (((mutable_bitField0_ & 0x00000001) != 0)) {
entityId_ = entityId_.getUnmodifiableView();
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final akka.protobufv3.internal.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_descriptor;
}
@java.lang.Override
protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.Builder.class);
}
public static final int ENTITYID_FIELD_NUMBER = 1;
private akka.protobufv3.internal.LazyStringList entityId_;
/**
* <code>repeated string entityId = 1;</code>
* @return A list containing the entityId.
*/
public akka.protobufv3.internal.ProtocolStringList
getEntityIdList() {
return entityId_;
}
/**
* <code>repeated string entityId = 1;</code>
* @return The count of entityId.
*/
public int getEntityIdCount() {
return entityId_.size();
}
/**
* <code>repeated string entityId = 1;</code>
* @param index The index of the element to return.
* @return The entityId at the given index.
*/
public java.lang.String getEntityId(int index) {
return entityId_.get(index);
}
/**
* <code>repeated string entityId = 1;</code>
* @param index The index of the value to return.
* @return The bytes of the entityId at the given index.
*/
public akka.protobufv3.internal.ByteString
getEntityIdBytes(int index) {
return entityId_.getByteString(index);
}
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized == 1) return true;
if (isInitialized == 0) return false;
memoizedIsInitialized = 1;
return true;
}
@java.lang.Override
public void writeTo(akka.protobufv3.internal.CodedOutputStream output)
throws java.io.IOException {
for (int i = 0; i < entityId_.size(); i++) {
akka.protobufv3.internal.GeneratedMessageV3.writeString(output, 1, entityId_.getRaw(i));
}
unknownFields.writeTo(output);
}
@java.lang.Override
public int getSerializedSize() {
int size = memoizedSize;
if (size != -1) return size;
size = 0;
{
int dataSize = 0;
for (int i = 0; i < entityId_.size(); i++) {
dataSize += computeStringSizeNoTag(entityId_.getRaw(i));
}
size += dataSize;
size += 1 * getEntityIdList().size();
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped)) {
return super.equals(obj);
}
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped other = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped) obj;
if (!getEntityIdList()
.equals(other.getEntityIdList())) return false;
if (!unknownFields.equals(other.unknownFields)) return false;
return true;
}
@java.lang.Override
public int hashCode() {
if (memoizedHashCode != 0) {
return memoizedHashCode;
}
int hash = 41;
hash = (19 * hash) + getDescriptor().hashCode();
if (getEntityIdCount() > 0) {
hash = (37 * hash) + ENTITYID_FIELD_NUMBER;
hash = (53 * hash) + getEntityIdList().hashCode();
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(
java.nio.ByteBuffer data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(
java.nio.ByteBuffer data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(
akka.protobufv3.internal.ByteString data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(
akka.protobufv3.internal.ByteString data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(byte[] data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(
byte[] data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(java.io.InputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(
java.io.InputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseDelimitedFrom(
java.io.InputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input, extensionRegistry);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(
akka.protobufv3.internal.CodedInputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parseFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
@java.lang.Override
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder() {
return DEFAULT_INSTANCE.toBuilder();
}
public static Builder newBuilder(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped prototype) {
return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
}
@java.lang.Override
public Builder toBuilder() {
return this == DEFAULT_INSTANCE
? new Builder() : new Builder().mergeFrom(this);
}
@java.lang.Override
protected Builder newBuilderForType(
akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code EntitiesStopped}
*/
public static final class Builder extends
akka.protobufv3.internal.GeneratedMessageV3.Builder<Builder> implements
// @@protoc_insertion_point(builder_implements:EntitiesStopped)
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStoppedOrBuilder {
public static final akka.protobufv3.internal.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_descriptor;
}
@java.lang.Override
protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.Builder.class);
}
// Construct using akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (akka.protobufv3.internal.GeneratedMessageV3
.alwaysUseFieldBuilders) {
}
}
@java.lang.Override
public Builder clear() {
super.clear();
entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
@java.lang.Override
public akka.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_EntitiesStopped_descriptor;
}
@java.lang.Override
public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped getDefaultInstanceForType() {
return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.getDefaultInstance();
}
@java.lang.Override
public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped build() {
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
@java.lang.Override
public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped buildPartial() {
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped result = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped(this);
int from_bitField0_ = bitField0_;
if (((bitField0_ & 0x00000001) != 0)) {
entityId_ = entityId_.getUnmodifiableView();
bitField0_ = (bitField0_ & ~0x00000001);
}
result.entityId_ = entityId_;
onBuilt();
return result;
}
@java.lang.Override
public Builder clone() {
return super.clone();
}
@java.lang.Override
public Builder setField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
java.lang.Object value) {
return super.setField(field, value);
}
@java.lang.Override
public Builder clearField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field) {
return super.clearField(field);
}
@java.lang.Override
public Builder clearOneof(
akka.protobufv3.internal.Descriptors.OneofDescriptor oneof) {
return super.clearOneof(oneof);
}
@java.lang.Override
public Builder setRepeatedField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
int index, java.lang.Object value) {
return super.setRepeatedField(field, index, value);
}
@java.lang.Override
public Builder addRepeatedField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
java.lang.Object value) {
return super.addRepeatedField(field, value);
}
@java.lang.Override
public Builder mergeFrom(akka.protobufv3.internal.Message other) {
if (other instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped) {
return mergeFrom((akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped other) {
if (other == akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped.getDefaultInstance()) return this;
if (!other.entityId_.isEmpty()) {
if (entityId_.isEmpty()) {
entityId_ = other.entityId_;
bitField0_ = (bitField0_ & ~0x00000001);
} else {
ensureEntityIdIsMutable();
entityId_.addAll(other.entityId_);
}
onChanged();
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
}
@java.lang.Override
public final boolean isInitialized() {
return true;
}
@java.lang.Override
public Builder mergeFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobufv3.internal.InvalidProtocolBufferException e) {
parsedMessage = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped) e.getUnfinishedMessage();
throw e.unwrapIOException();
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
private akka.protobufv3.internal.LazyStringList entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY;
private void ensureEntityIdIsMutable() {
if (!((bitField0_ & 0x00000001) != 0)) {
entityId_ = new akka.protobufv3.internal.LazyStringArrayList(entityId_);
bitField0_ |= 0x00000001;
}
}
/**
* <code>repeated string entityId = 1;</code>
* @return A list containing the entityId.
*/
public akka.protobufv3.internal.ProtocolStringList
getEntityIdList() {
return entityId_.getUnmodifiableView();
}
/**
* <code>repeated string entityId = 1;</code>
* @return The count of entityId.
*/
public int getEntityIdCount() {
return entityId_.size();
}
/**
* <code>repeated string entityId = 1;</code>
* @param index The index of the element to return.
* @return The entityId at the given index.
*/
public java.lang.String getEntityId(int index) {
return entityId_.get(index);
}
/**
* <code>repeated string entityId = 1;</code>
* @param index The index of the value to return.
* @return The bytes of the entityId at the given index.
*/
public akka.protobufv3.internal.ByteString
getEntityIdBytes(int index) {
return entityId_.getByteString(index);
}
/**
* <code>repeated string entityId = 1;</code>
* @param index The index to set the value at.
* @param value The entityId to set.
* @return This builder for chaining.
*/
public Builder setEntityId(
int index, java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureEntityIdIsMutable();
entityId_.set(index, value);
onChanged();
return this;
}
/**
* <code>repeated string entityId = 1;</code>
* @param value The entityId to add.
* @return This builder for chaining.
*/
public Builder addEntityId(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureEntityIdIsMutable();
entityId_.add(value);
onChanged();
return this;
}
/**
* <code>repeated string entityId = 1;</code>
* @param values The entityId to add.
* @return This builder for chaining.
*/
public Builder addAllEntityId(
java.lang.Iterable<java.lang.String> values) {
ensureEntityIdIsMutable();
akka.protobufv3.internal.AbstractMessageLite.Builder.addAll(
values, entityId_);
onChanged();
return this;
}
/**
* <code>repeated string entityId = 1;</code>
* @return This builder for chaining.
*/
public Builder clearEntityId() {
entityId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000001);
onChanged();
return this;
}
/**
* <code>repeated string entityId = 1;</code>
* @param value The bytes of the entityId to add.
* @return This builder for chaining.
*/
public Builder addEntityIdBytes(
akka.protobufv3.internal.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureEntityIdIsMutable();
entityId_.add(value);
onChanged();
return this;
}
@java.lang.Override
public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
}
@java.lang.Override
public final Builder mergeUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
return super.mergeUnknownFields(unknownFields);
}
// @@protoc_insertion_point(builder_scope:EntitiesStopped)
}
// @@protoc_insertion_point(class_scope:EntitiesStopped)
private static final akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped DEFAULT_INSTANCE;
static {
DEFAULT_INSTANCE = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped();
}
public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped getDefaultInstance() {
return DEFAULT_INSTANCE;
}
@java.lang.Deprecated public static final akka.protobufv3.internal.Parser<EntitiesStopped>
PARSER = new akka.protobufv3.internal.AbstractParser<EntitiesStopped>() {
@java.lang.Override
public EntitiesStopped parsePartialFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return new EntitiesStopped(input, extensionRegistry);
}
};
public static akka.protobufv3.internal.Parser<EntitiesStopped> parser() {
return PARSER;
}
@java.lang.Override
public akka.protobufv3.internal.Parser<EntitiesStopped> getParserForType() {
return PARSER;
}
@java.lang.Override
public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.EntitiesStopped getDefaultInstanceForType() {
return DEFAULT_INSTANCE;
}
}
public interface ShardStatsOrBuilder extends public interface ShardStatsOrBuilder extends
// @@protoc_insertion_point(interface_extends:ShardStats) // @@protoc_insertion_point(interface_extends:ShardStats)
akka.protobufv3.internal.MessageOrBuilder { akka.protobufv3.internal.MessageOrBuilder {
@ -18914,6 +19567,11 @@ public final class ClusterShardingMessages {
private static final private static final
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internal_static_EntityStopped_fieldAccessorTable; internal_static_EntityStopped_fieldAccessorTable;
private static final akka.protobufv3.internal.Descriptors.Descriptor
internal_static_EntitiesStopped_descriptor;
private static final
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internal_static_EntitiesStopped_fieldAccessorTable;
private static final akka.protobufv3.internal.Descriptors.Descriptor private static final akka.protobufv3.internal.Descriptors.Descriptor
internal_static_ShardStats_descriptor; internal_static_ShardStats_descriptor;
private static final private static final
@ -19005,29 +19663,30 @@ public final class ClusterShardingMessages {
"\t\022\016\n\006region\030\002 \002(\t\"\037\n\013EntityState\022\020\n\010enti" + "\t\022\016\n\006region\030\002 \002(\t\"\037\n\013EntityState\022\020\n\010enti" +
"ties\030\001 \003(\t\"!\n\rEntityStarted\022\020\n\010entityId\030" + "ties\030\001 \003(\t\"!\n\rEntityStarted\022\020\n\010entityId\030" +
"\001 \002(\t\"#\n\017EntitiesStarted\022\020\n\010entityId\030\001 \003" + "\001 \002(\t\"#\n\017EntitiesStarted\022\020\n\010entityId\030\001 \003" +
"(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t\"0\n" + "(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t\"#\n" +
"\nShardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityCoun" + "\017EntitiesStopped\022\020\n\010entityId\030\001 \003(\t\"0\n\nSh" +
"t\030\002 \002(\005\"A\n\020ShardRegionStats\022\035\n\005stats\030\001 \003" + "ardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityCount\030\002" +
"(\0132\016.MapFieldEntry\022\016\n\006failed\030\002 \003(\t\"+\n\rMa" + " \002(\005\"A\n\020ShardRegionStats\022\035\n\005stats\030\001 \003(\0132" +
"pFieldEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\005\"" + "\016.MapFieldEntry\022\016\n\006failed\030\002 \003(\t\"+\n\rMapFi" +
"/\n\027GetClusterShardingStats\022\024\n\014timeoutNan" + "eldEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\005\"/\n\027" +
"os\030\001 \002(\003\"A\n\024ClusterShardingStats\022)\n\005stat" + "GetClusterShardingStats\022\024\n\014timeoutNanos\030" +
"s\030\001 \003(\0132\032.ClusterShardingStatsEntry\"X\n\031C" + "\001 \002(\003\"A\n\024ClusterShardingStats\022)\n\005stats\030\001" +
"lusterShardingStatsEntry\022\031\n\007address\030\001 \002(" + " \003(\0132\032.ClusterShardingStatsEntry\"X\n\031Clus" +
"\0132\010.Address\022 \n\005stats\030\002 \002(\0132\021.ShardRegion" + "terShardingStatsEntry\022\031\n\007address\030\001 \002(\0132\010" +
"Stats\"+\n\016CurrentRegions\022\031\n\007regions\030\001 \003(\013" + ".Address\022 \n\005stats\030\002 \002(\0132\021.ShardRegionSta" +
"2\010.Address\"K\n\007Address\022\020\n\010protocol\030\001 \002(\t\022" + "ts\"+\n\016CurrentRegions\022\031\n\007regions\030\001 \003(\0132\010." +
"\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004por" + "Address\"K\n\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006" +
"t\030\004 \002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001 \002(\t" + "system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004port\030\004" +
"\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022\017\n\007" + " \002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001 \002(\t\"3\n" +
"shardId\030\002 \002(\t\"7\n\021CurrentShardState\022\017\n\007sh" + "\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022\017\n\007sha" +
"ardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"0\n\nShardS" + "rdId\030\002 \002(\t\"7\n\021CurrentShardState\022\017\n\007shard" +
"tate\022\017\n\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t" + "Id\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"0\n\nShardStat" +
"\"F\n\027CurrentShardRegionState\022\033\n\006shards\030\001 " + "e\022\017\n\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"F\n" +
"\003(\0132\013.ShardState\022\016\n\006failed\030\002 \003(\t\"7\n\024Reme" + "\027CurrentShardRegionState\022\033\n\006shards\030\001 \003(\013" +
"mberedShardState\022\017\n\007shardId\030\001 \003(\t\022\016\n\006mar" + "2\013.ShardState\022\016\n\006failed\030\002 \003(\t\"7\n\024Remembe" +
"ker\030\002 \001(\010B&\n\"akka.cluster.sharding.proto" + "redShardState\022\017\n\007shardId\030\001 \003(\t\022\016\n\006marker" +
"buf.msgH\001" "\030\002 \001(\010B&\n\"akka.cluster.sharding.protobuf" +
".msgH\001"
}; };
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData, .internalBuildGeneratedFileFrom(descriptorData,
@ -19093,86 +19752,92 @@ public final class ClusterShardingMessages {
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_EntityStopped_descriptor, internal_static_EntityStopped_descriptor,
new java.lang.String[] { "EntityId", }); new java.lang.String[] { "EntityId", });
internal_static_ShardStats_descriptor = internal_static_EntitiesStopped_descriptor =
getDescriptor().getMessageTypes().get(9); getDescriptor().getMessageTypes().get(9);
internal_static_EntitiesStopped_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_EntitiesStopped_descriptor,
new java.lang.String[] { "EntityId", });
internal_static_ShardStats_descriptor =
getDescriptor().getMessageTypes().get(10);
internal_static_ShardStats_fieldAccessorTable = new internal_static_ShardStats_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_ShardStats_descriptor, internal_static_ShardStats_descriptor,
new java.lang.String[] { "Shard", "EntityCount", }); new java.lang.String[] { "Shard", "EntityCount", });
internal_static_ShardRegionStats_descriptor = internal_static_ShardRegionStats_descriptor =
getDescriptor().getMessageTypes().get(10); getDescriptor().getMessageTypes().get(11);
internal_static_ShardRegionStats_fieldAccessorTable = new internal_static_ShardRegionStats_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_ShardRegionStats_descriptor, internal_static_ShardRegionStats_descriptor,
new java.lang.String[] { "Stats", "Failed", }); new java.lang.String[] { "Stats", "Failed", });
internal_static_MapFieldEntry_descriptor = internal_static_MapFieldEntry_descriptor =
getDescriptor().getMessageTypes().get(11); getDescriptor().getMessageTypes().get(12);
internal_static_MapFieldEntry_fieldAccessorTable = new internal_static_MapFieldEntry_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_MapFieldEntry_descriptor, internal_static_MapFieldEntry_descriptor,
new java.lang.String[] { "Key", "Value", }); new java.lang.String[] { "Key", "Value", });
internal_static_GetClusterShardingStats_descriptor = internal_static_GetClusterShardingStats_descriptor =
getDescriptor().getMessageTypes().get(12); getDescriptor().getMessageTypes().get(13);
internal_static_GetClusterShardingStats_fieldAccessorTable = new internal_static_GetClusterShardingStats_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_GetClusterShardingStats_descriptor, internal_static_GetClusterShardingStats_descriptor,
new java.lang.String[] { "TimeoutNanos", }); new java.lang.String[] { "TimeoutNanos", });
internal_static_ClusterShardingStats_descriptor = internal_static_ClusterShardingStats_descriptor =
getDescriptor().getMessageTypes().get(13); getDescriptor().getMessageTypes().get(14);
internal_static_ClusterShardingStats_fieldAccessorTable = new internal_static_ClusterShardingStats_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_ClusterShardingStats_descriptor, internal_static_ClusterShardingStats_descriptor,
new java.lang.String[] { "Stats", }); new java.lang.String[] { "Stats", });
internal_static_ClusterShardingStatsEntry_descriptor = internal_static_ClusterShardingStatsEntry_descriptor =
getDescriptor().getMessageTypes().get(14); getDescriptor().getMessageTypes().get(15);
internal_static_ClusterShardingStatsEntry_fieldAccessorTable = new internal_static_ClusterShardingStatsEntry_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_ClusterShardingStatsEntry_descriptor, internal_static_ClusterShardingStatsEntry_descriptor,
new java.lang.String[] { "Address", "Stats", }); new java.lang.String[] { "Address", "Stats", });
internal_static_CurrentRegions_descriptor = internal_static_CurrentRegions_descriptor =
getDescriptor().getMessageTypes().get(15); getDescriptor().getMessageTypes().get(16);
internal_static_CurrentRegions_fieldAccessorTable = new internal_static_CurrentRegions_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_CurrentRegions_descriptor, internal_static_CurrentRegions_descriptor,
new java.lang.String[] { "Regions", }); new java.lang.String[] { "Regions", });
internal_static_Address_descriptor = internal_static_Address_descriptor =
getDescriptor().getMessageTypes().get(16); getDescriptor().getMessageTypes().get(17);
internal_static_Address_fieldAccessorTable = new internal_static_Address_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_Address_descriptor, internal_static_Address_descriptor,
new java.lang.String[] { "Protocol", "System", "Hostname", "Port", }); new java.lang.String[] { "Protocol", "System", "Hostname", "Port", });
internal_static_StartEntity_descriptor = internal_static_StartEntity_descriptor =
getDescriptor().getMessageTypes().get(17); getDescriptor().getMessageTypes().get(18);
internal_static_StartEntity_fieldAccessorTable = new internal_static_StartEntity_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_StartEntity_descriptor, internal_static_StartEntity_descriptor,
new java.lang.String[] { "EntityId", }); new java.lang.String[] { "EntityId", });
internal_static_StartEntityAck_descriptor = internal_static_StartEntityAck_descriptor =
getDescriptor().getMessageTypes().get(18); getDescriptor().getMessageTypes().get(19);
internal_static_StartEntityAck_fieldAccessorTable = new internal_static_StartEntityAck_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_StartEntityAck_descriptor, internal_static_StartEntityAck_descriptor,
new java.lang.String[] { "EntityId", "ShardId", }); new java.lang.String[] { "EntityId", "ShardId", });
internal_static_CurrentShardState_descriptor = internal_static_CurrentShardState_descriptor =
getDescriptor().getMessageTypes().get(19); getDescriptor().getMessageTypes().get(20);
internal_static_CurrentShardState_fieldAccessorTable = new internal_static_CurrentShardState_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_CurrentShardState_descriptor, internal_static_CurrentShardState_descriptor,
new java.lang.String[] { "ShardId", "EntityIds", }); new java.lang.String[] { "ShardId", "EntityIds", });
internal_static_ShardState_descriptor = internal_static_ShardState_descriptor =
getDescriptor().getMessageTypes().get(20); getDescriptor().getMessageTypes().get(21);
internal_static_ShardState_fieldAccessorTable = new internal_static_ShardState_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_ShardState_descriptor, internal_static_ShardState_descriptor,
new java.lang.String[] { "ShardId", "EntityIds", }); new java.lang.String[] { "ShardId", "EntityIds", });
internal_static_CurrentShardRegionState_descriptor = internal_static_CurrentShardRegionState_descriptor =
getDescriptor().getMessageTypes().get(21); getDescriptor().getMessageTypes().get(22);
internal_static_CurrentShardRegionState_fieldAccessorTable = new internal_static_CurrentShardRegionState_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_CurrentShardRegionState_descriptor, internal_static_CurrentShardRegionState_descriptor,
new java.lang.String[] { "Shards", "Failed", }); new java.lang.String[] { "Shards", "Failed", });
internal_static_RememberedShardState_descriptor = internal_static_RememberedShardState_descriptor =
getDescriptor().getMessageTypes().get(22); getDescriptor().getMessageTypes().get(23);
internal_static_RememberedShardState_fieldAccessorTable = new internal_static_RememberedShardState_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_RememberedShardState_descriptor, internal_static_RememberedShardState_descriptor,

View file

@ -42,6 +42,7 @@ message EntityState {
repeated string entities = 1; repeated string entities = 1;
} }
// not written anymore but kept for backwards compatibility
message EntityStarted { message EntityStarted {
required string entityId = 1; required string entityId = 1;
} }
@ -50,10 +51,15 @@ message EntitiesStarted {
repeated string entityId = 1; repeated string entityId = 1;
} }
// not written anymore but kept for backwards compatibility
message EntityStopped { message EntityStopped {
required string entityId = 1; required string entityId = 1;
} }
message EntitiesStopped {
repeated string entityId = 1;
}
message ShardStats { message ShardStats {
required string shard = 1; required string shard = 1;
required int32 entityCount = 2; required int32 entityCount = 2;

View file

@ -29,7 +29,6 @@ import akka.cluster.ddata.SelfUniqueAddress
import akka.cluster.sharding.ClusterShardingSettings import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion.EntityId import akka.cluster.sharding.ShardRegion.EntityId
import akka.cluster.sharding.ShardRegion.ShardId import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.internal.RememberEntitiesShardStore.{ AddEntities, RemoveEntity }
import akka.util.PrettyDuration._ import akka.util.PrettyDuration._
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
@ -59,6 +58,12 @@ private[akka] object DDataRememberEntitiesShardStore {
private def stateKeys(typeName: String, shardId: ShardId): Array[ORSetKey[EntityId]] = private def stateKeys(typeName: String, shardId: ShardId): Array[ORSetKey[EntityId]] =
Array.tabulate(numberOfKeys)(i => ORSetKey[EntityId](s"shard-$typeName-$shardId-$i")) Array.tabulate(numberOfKeys)(i => ORSetKey[EntityId](s"shard-$typeName-$shardId-$i"))
private sealed trait Evt {
def id: EntityId
}
private case class Started(id: EntityId) extends Evt
private case class Stopped(id: EntityId) extends Evt
} }
/** /**
@ -104,9 +109,8 @@ private[akka] final class DDataRememberEntitiesShardStore(
override def receive: Receive = idle override def receive: Receive = idle
def idle: Receive = { def idle: Receive = {
case RememberEntitiesShardStore.GetEntities => onGetEntities() case RememberEntitiesShardStore.GetEntities => onGetEntities()
case AddEntities(ids) => addEntities(ids) case update: RememberEntitiesShardStore.Update => onUpdate(update)
case RemoveEntity(id) => removeEntity(id)
} }
def waitingForAllEntityIds(requestor: ActorRef, gotKeys: Set[Int], ids: Set[EntityId]): Receive = { def waitingForAllEntityIds(requestor: ActorRef, gotKeys: Set[Int], ids: Set[EntityId]): Receive = {
@ -138,78 +142,82 @@ private[akka] final class DDataRememberEntitiesShardStore(
case GetDataDeleted(_, _) => case GetDataDeleted(_, _) =>
log.error("Unable to get an initial state because it was deleted") log.error("Unable to get an initial state because it was deleted")
context.stop(self) context.stop(self)
case update: RememberEntitiesShardStore.Update =>
log.warning("Got an update before load of initial entities completed, dropping update: [{}]", update)
} }
} }
private def addEntities(ids: Set[EntityId]): Unit = { private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = {
val updates: Map[Set[EntityId], (Update[ORSet[EntityId]], Int)] = ids.groupBy(key).map { // FIXME what about ordering of adds/removes vs sets, I think we can lose one
case (key, ids) => val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped))
(ids, (Update(key, ORSet.empty[EntityId], writeMajority, Some(ids)) { existing => // map from set of evts (for same ddata key) to one update that applies each of them
ids.foldLeft(existing) { val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] =
case (acc, nextId) => acc :+ nextId allEvts.groupBy(evt => key(evt.id)).map {
} case (key, evts) =>
}, maxUpdateAttempts)) (evts, (Update(key, ORSet.empty[EntityId], writeMajority, Some(evts)) { existing =>
} evts.foldLeft(existing) {
case (acc, Started(id)) => acc :+ id
case (acc, Stopped(id)) => acc.remove(id)
}
}, maxUpdateAttempts))
}
updates.foreach { ddataUpdates.foreach {
case (_, (update, _)) => case (_, (update, _)) =>
replicator ! update replicator ! update
} }
context.become(waitingForUpdates(sender(), ids, updates)) context.become(waitingForUpdates(sender(), update, ddataUpdates))
}
private def removeEntity(id: EntityId): Unit = {
val keyForEntity = key(id)
val update = Update(keyForEntity, ORSet.empty[EntityId], writeMajority, Some(Set(id))) { existing =>
existing.remove(id)
}
replicator ! update
context.become(waitingForUpdates(sender(), Set(id), Map((Set(id), (update, maxUpdateAttempts)))))
} }
private def waitingForUpdates( private def waitingForUpdates(
requestor: ActorRef, requestor: ActorRef,
allIds: Set[EntityId], update: RememberEntitiesShardStore.Update,
updates: Map[Set[EntityId], (Update[ORSet[EntityId]], Int)]): Receive = { allUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)]): Receive = {
case UpdateSuccess(_, Some(ids: Set[EntityId] @unchecked)) =>
if (log.isDebugEnabled)
log.debug("The DDataShard state was successfully updated for [{}]", ids.mkString(", "))
val remaining = updates - ids
if (remaining.isEmpty) {
requestor ! RememberEntitiesShardStore.UpdateDone(allIds)
context.become(idle)
} else {
context.become(waitingForUpdates(requestor, allIds, remaining))
}
case UpdateTimeout(_, Some(ids: Set[EntityId] @unchecked)) => // updatesLeft used both to keep track of what work remains and for retrying on timeout up to a limit
val (update, retriesLeft) = updates(ids) def next(updatesLeft: Map[Set[Evt], (Update[ORSet[EntityId]], Int)]): Receive = {
if (retriesLeft > 0) { case UpdateSuccess(_, Some(evts: Set[Evt] @unchecked)) =>
log.debug("Retrying update because of write timeout, tries left [{}]", retriesLeft) log.debug("The DDataShard state was successfully updated for [{}]", evts)
replicator ! update val remainingAfterThis = updatesLeft - evts
context.become(waitingForUpdates(requestor, allIds, updates.updated(ids, (update, retriesLeft - 1)))) if (remainingAfterThis.isEmpty) {
} else { requestor ! RememberEntitiesShardStore.UpdateDone(update.started, update.stopped)
log.error( context.become(idle)
"Unable to update state, within 'updating-state-timeout'= [{}], gave up after [{}] retries", } else {
writeMajority.timeout.pretty, context.become(next(remainingAfterThis))
maxUpdateAttempts) }
case UpdateTimeout(_, Some(evts: Set[Evt] @unchecked)) =>
val (updateForEvts, retriesLeft) = updatesLeft(evts)
if (retriesLeft > 0) {
log.debug("Retrying update because of write timeout, tries left [{}]", retriesLeft)
replicator ! updateForEvts
context.become(next(updatesLeft.updated(evts, (updateForEvts, retriesLeft - 1))))
} else {
log.error(
"Unable to update state, within 'updating-state-timeout'= [{}], gave up after [{}] retries",
writeMajority.timeout.pretty,
maxUpdateAttempts)
// will trigger shard restart
context.stop(self)
}
case StoreFailure(_, _) =>
log.error("Unable to update state, due to store failure")
// will trigger shard restart // will trigger shard restart
context.stop(self) context.stop(self)
} case ModifyFailure(_, error, cause, _) =>
case StoreFailure(_, _) => log.error(cause, "Unable to update state, due to modify failure: {}", error)
log.error("Unable to update state, due to store failure") // will trigger shard restart
// will trigger shard restart context.stop(self)
context.stop(self) case UpdateDataDeleted(_, _) =>
case ModifyFailure(_, error, cause, _) => log.error("Unable to update state, due to delete")
log.error(cause, "Unable to update state, due to modify failure: {}", error) // will trigger shard restart
// will trigger shard restart context.stop(self)
context.stop(self) case update: RememberEntitiesShardStore.Update =>
case UpdateDataDeleted(_, _) => log.warning("Got a new update before write of previous completed, dropping update: [{}]", update)
log.error("Unable to update state, due to delete") }
// will trigger shard restart
context.stop(self) next(allUpdates)
} }
private def onGetEntities(): Unit = { private def onGetEntities(): Unit = {

View file

@ -60,14 +60,14 @@ private[akka] object EventSourcedRememberEntitiesStore {
/** /**
* `State` change for starting a set of entities in this `Shard` * `State` change for starting a set of entities in this `Shard`
*/ */
final case class EntitiesStarted(entities: Set[String]) extends StateChange final case class EntitiesStarted(entities: Set[EntityId]) extends StateChange
case object StartedAck case object StartedAck
/** /**
* `State` change for an entity which has terminated. * `State` change for an entity which has terminated.
*/ */
final case class EntityStopped(entityId: EntityId) extends StateChange final case class EntitiesStopped(entities: Set[EntityId]) extends StateChange
def props(typeName: String, shardId: ShardRegion.ShardId, settings: ClusterShardingSettings): Props = def props(typeName: String, shardId: ShardRegion.ShardId, settings: ClusterShardingSettings): Props =
Props(new EventSourcedRememberEntitiesStore(typeName, shardId, settings)) Props(new EventSourcedRememberEntitiesStore(typeName, shardId, settings))
@ -98,25 +98,28 @@ private[akka] final class EventSourcedRememberEntitiesStore(
override def receiveRecover: Receive = { override def receiveRecover: Receive = {
case EntitiesStarted(ids) => state = state.copy(state.entities.union(ids)) case EntitiesStarted(ids) => state = state.copy(state.entities.union(ids))
case EntityStopped(id) => state = state.copy(state.entities - id) case EntitiesStopped(ids) => state = state.copy(state.entities -- ids)
case SnapshotOffer(_, snapshot: State) => state = snapshot case SnapshotOffer(_, snapshot: State) => state = snapshot
case RecoveryCompleted => case RecoveryCompleted =>
log.debug("Recovery completed for shard [{}] with [{}] entities", shardId, state.entities.size) log.debug("Recovery completed for shard [{}] with [{}] entities", shardId, state.entities.size)
} }
override def receiveCommand: Receive = { override def receiveCommand: Receive = {
case RememberEntitiesShardStore.AddEntities(ids) =>
persist(EntitiesStarted(ids)) { started => case RememberEntitiesShardStore.Update(started, stopped) =>
sender() ! RememberEntitiesShardStore.UpdateDone(ids) val events =
state.copy(state.entities ++ started.entities) (if (started.nonEmpty) EntitiesStarted(started) :: Nil else Nil) :::
saveSnapshotWhenNeeded() (if (stopped.nonEmpty) EntitiesStopped(stopped) :: Nil else Nil)
} var left = events.size
case RememberEntitiesShardStore.RemoveEntity(id) => persistAll(events) { _ =>
persist(EntityStopped(id)) { stopped => left -= 1
sender() ! RememberEntitiesShardStore.UpdateDone(Set(id)) if (left == 0) {
state.copy(state.entities - stopped.entityId) sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped)
saveSnapshotWhenNeeded() state.copy(state.entities.union(started) -- stopped)
saveSnapshotWhenNeeded()
}
} }
case RememberEntitiesShardStore.GetEntities => case RememberEntitiesShardStore.GetEntities =>
sender() ! RememberEntitiesShardStore.RememberedEntities(state.entities) sender() ! RememberEntitiesShardStore.RememberedEntities(state.entities)

View file

@ -46,10 +46,10 @@ private[akka] object RememberEntitiesShardStore {
// SPI protocol for a remember entities shard store // SPI protocol for a remember entities shard store
sealed trait Command sealed trait Command
final case class AddEntities(entityIds: Set[EntityId]) extends Command // Note: the store is not expected to receive and handle new update before it has responded to the previous one
final case class RemoveEntity(entityId: EntityId) extends Command final case class Update(started: Set[EntityId], stopped: Set[EntityId]) extends Command
// responses for UpdateEntity add and remove // responses for Update
final case class UpdateDone(entityIds: Set[EntityId]) final case class UpdateDone(started: Set[EntityId], stopped: Set[EntityId])
case object GetEntities extends Command case object GetEntities extends Command
final case class RememberedEntities(entities: Set[EntityId]) final case class RememberedEntities(entities: Set[EntityId])

View file

@ -22,6 +22,7 @@ import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion._ import akka.cluster.sharding.ShardRegion._
import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm } import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm }
import akka.cluster.sharding.internal.EventSourcedRememberShards.{ MigrationMarker, State => RememberShardsState } import akka.cluster.sharding.internal.EventSourcedRememberShards.{ MigrationMarker, State => RememberShardsState }
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ State => EntityState }
import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages
import akka.protobufv3.internal.MessageLite import akka.protobufv3.internal.MessageLite
import akka.serialization.BaseSerializer import akka.serialization.BaseSerializer
@ -32,7 +33,7 @@ import java.io.NotSerializableException
import akka.actor.Address import akka.actor.Address
import akka.cluster.sharding.ShardRegion._ import akka.cluster.sharding.ShardRegion._
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.EntitiesStarted import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ EntitiesStarted, EntitiesStopped }
import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages
/** /**
@ -44,7 +45,6 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
with BaseSerializer { with BaseSerializer {
import Shard.{ CurrentShardState, GetCurrentShardState } import Shard.{ CurrentShardState, GetCurrentShardState }
import Shard.{ GetShardStats, ShardStats } import Shard.{ GetShardStats, ShardStats }
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ State => EntityState, EntityStopped }
import ShardCoordinator.Internal._ import ShardCoordinator.Internal._
private final val BufferSize = 1024 * 4 private final val BufferSize = 1024 * 4
@ -74,6 +74,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
private val EntityStartedManifest = "CB" private val EntityStartedManifest = "CB"
private val EntityStoppedManifest = "CD" private val EntityStoppedManifest = "CD"
private val EntitiesStartedManifest = "CE" private val EntitiesStartedManifest = "CE"
private val EntitiesStoppedManifest = "CF"
private val StartEntityManifest = "EA" private val StartEntityManifest = "EA"
private val StartEntityAckManifest = "EB" private val StartEntityAckManifest = "EB"
@ -101,6 +102,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
EntityStartedManifest -> entityStartedFromBinary, EntityStartedManifest -> entityStartedFromBinary,
EntitiesStartedManifest -> entitiesStartedFromBinary, EntitiesStartedManifest -> entitiesStartedFromBinary,
EntityStoppedManifest -> entityStoppedFromBinary, EntityStoppedManifest -> entityStoppedFromBinary,
EntitiesStoppedManifest -> entitiesStoppedFromBinary,
CoordinatorStateManifest -> coordinatorStateFromBinary, CoordinatorStateManifest -> coordinatorStateFromBinary,
ShardRegionRegisteredManifest -> { bytes => ShardRegionRegisteredManifest -> { bytes =>
ShardRegionRegistered(actorRefMessageFromBinary(bytes)) ShardRegionRegistered(actorRefMessageFromBinary(bytes))
@ -203,7 +205,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
override def manifest(obj: AnyRef): String = obj match { override def manifest(obj: AnyRef): String = obj match {
case _: EntityState => EntityStateManifest case _: EntityState => EntityStateManifest
case _: EntitiesStarted => EntitiesStartedManifest case _: EntitiesStarted => EntitiesStartedManifest
case _: EntityStopped => EntityStoppedManifest case _: EntitiesStopped => EntitiesStoppedManifest
case _: State => CoordinatorStateManifest case _: State => CoordinatorStateManifest
case _: ShardRegionRegistered => ShardRegionRegisteredManifest case _: ShardRegionRegistered => ShardRegionRegisteredManifest
@ -276,7 +278,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
case m: EntityState => entityStateToProto(m).toByteArray case m: EntityState => entityStateToProto(m).toByteArray
case m: EntitiesStarted => entitiesStartedToProto(m).toByteArray case m: EntitiesStarted => entitiesStartedToProto(m).toByteArray
case m: EntityStopped => entityStoppedToProto(m).toByteArray case m: EntitiesStopped => entitiesStoppedToProto(m).toByteArray
case s: StartEntity => startEntityToByteArray(s) case s: StartEntity => startEntityToByteArray(s)
case s: StartEntityAck => startEntityAckToByteArray(s) case s: StartEntityAck => startEntityAckToByteArray(s)
@ -417,11 +419,14 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
private def entitiesStartedFromBinary(bytes: Array[Byte]): EntitiesStarted = private def entitiesStartedFromBinary(bytes: Array[Byte]): EntitiesStarted =
EntitiesStarted(sm.EntitiesStarted.parseFrom(bytes).getEntityIdList.asScala.toSet) EntitiesStarted(sm.EntitiesStarted.parseFrom(bytes).getEntityIdList.asScala.toSet)
private def entityStoppedToProto(evt: EntityStopped): sm.EntityStopped = private def entitiesStoppedToProto(evt: EntitiesStopped): sm.EntitiesStopped =
sm.EntityStopped.newBuilder().setEntityId(evt.entityId).build() sm.EntitiesStopped.newBuilder().addAllEntityId(evt.entities.asJava).build()
private def entityStoppedFromBinary(bytes: Array[Byte]): EntityStopped = private def entityStoppedFromBinary(bytes: Array[Byte]): EntitiesStopped =
EntityStopped(sm.EntityStopped.parseFrom(bytes).getEntityId) EntitiesStopped(Set(sm.EntityStopped.parseFrom(bytes).getEntityId))
private def entitiesStoppedFromBinary(bytes: Array[Byte]): EntitiesStopped =
EntitiesStopped(sm.EntitiesStopped.parseFrom(bytes).getEntityIdList.asScala.toSet)
private def shardStatsToProto(evt: ShardStats): sm.ShardStats = private def shardStatsToProto(evt: ShardStats): sm.ShardStats =
sm.ShardStats.newBuilder().setShard(evt.shardId).setEntityCount(evt.entityCount).build() sm.ShardStats.newBuilder().setShard(evt.shardId).setEntityCount(evt.entityCount).build()

View file

@ -5,7 +5,15 @@
package akka.cluster.sharding package akka.cluster.sharding
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.cluster.sharding import akka.cluster.sharding
import akka.cluster.sharding.Shard.{ Active, Passivating, RememberedButNotCreated, Remembering, Stopped } import akka.cluster.sharding.Shard.{
Active,
NoState,
Passivating,
PassivationComplete,
RememberedButNotCreated,
RememberingStart,
RememberingStop
}
import akka.event.NoLogging import akka.event.NoLogging
import akka.util.OptionVal import akka.util.OptionVal
import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers
@ -15,62 +23,81 @@ class EntitiesSpec extends AnyWordSpec with Matchers {
"Entities" should { "Entities" should {
"start empty" in { "start empty" in {
val entities = new sharding.Shard.Entities(NoLogging) val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = false)
entities.activeEntityIds() shouldEqual Set.empty entities.activeEntityIds() shouldEqual Set.empty
entities.size shouldEqual 0 entities.size shouldEqual 0
entities.activeEntities() shouldEqual Set.empty entities.activeEntities() shouldEqual Set.empty
} }
"set already remembered entities to state RememberedButNotStarted" in { "set already remembered entities to state RememberedButNotStarted" in {
val entities = new sharding.Shard.Entities(NoLogging) val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true)
val ids = Set("a", "b", "c") val ids = Set("a", "b", "c")
entities.alreadyRemembered(ids) entities.alreadyRemembered(ids)
entities.activeEntities() shouldEqual Set.empty entities.activeEntities() shouldEqual Set.empty
entities.size shouldEqual 3 entities.size shouldEqual 3
ids.foreach { id => ids.foreach { id =>
entities.entityState(id) shouldEqual OptionVal.Some(RememberedButNotCreated) entities.entityState(id) shouldEqual RememberedButNotCreated
} }
} }
"set state to terminating" in { "set state to remembering start" in {
val entities = new sharding.Shard.Entities(NoLogging) val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true)
val ref = ActorRef.noSender entities.rememberingStart("a", None)
entities.addEntity("a", ref) entities.entityState("a") shouldEqual RememberingStart(None)
entities.terminated(ref) entities.pendingRememberedEntitiesExist() should ===(true)
entities.entityState("a") shouldEqual OptionVal.Some(Stopped) val (starts, stops) = entities.pendingRememberEntities()
entities.activeEntities() shouldEqual Set.empty starts.keySet should contain("a")
stops should be(empty)
// also verify removal from pending once it starts
entities.addEntity("a", ActorRef.noSender)
entities.pendingRememberedEntitiesExist() should ===(false)
entities.pendingRememberEntities()._1 should be(empty)
}
"set state to remembering stop" in {
val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true)
entities.addEntity("a", ActorRef.noSender) // need to go through active to passivate
entities.entityPassivating("a") // need to go through passivate to remember stop
entities.rememberingStop("a", PassivationComplete)
entities.entityState("a") shouldEqual RememberingStop(PassivationComplete)
entities.pendingRememberedEntitiesExist() should ===(true)
val (starts, stops) = entities.pendingRememberEntities()
stops should contain("a")
starts should be(empty)
// also verify removal from pending once it stops
entities.removeEntity("a")
entities.pendingRememberedEntitiesExist() should ===(false)
entities.pendingRememberEntities()._2 should be(empty)
} }
"set state to remembering" in {
val entities = new sharding.Shard.Entities(NoLogging)
entities.remembering("a")
entities.entityState("a") shouldEqual OptionVal.Some(Remembering)
}
"fully remove an entity" in { "fully remove an entity" in {
val entities = new sharding.Shard.Entities(NoLogging) val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true)
val ref = ActorRef.noSender val ref = ActorRef.noSender
entities.addEntity("a", ref) entities.addEntity("a", ref)
entities.entityPassivating("a") // needs to go through passivating to be removed
entities.removeEntity("a") entities.removeEntity("a")
entities.entityState("a") shouldEqual OptionVal.None entities.entityState("a") shouldEqual NoState
entities.activeEntities() shouldEqual Set.empty entities.activeEntities() should be(empty)
entities.activeEntityIds() should be(empty)
} }
"add an entity as active" in { "add an entity as active" in {
val entities = new sharding.Shard.Entities(NoLogging) val entities = new sharding.Shard.Entities(NoLogging, false)
val ref = ActorRef.noSender val ref = ActorRef.noSender
entities.addEntity("a", ref) entities.addEntity("a", ref)
entities.entityState("a") shouldEqual OptionVal.Some(Active(ref)) entities.entityState("a") shouldEqual Active(ref)
} }
"look up actor ref by id" in { "look up actor ref by id" in {
val entities = new sharding.Shard.Entities(NoLogging) val entities = new sharding.Shard.Entities(NoLogging, false)
val ref = ActorRef.noSender val ref = ActorRef.noSender
entities.addEntity("a", ref) entities.addEntity("a", ref)
entities.entityId(ref) shouldEqual OptionVal.Some("a") entities.entityId(ref) shouldEqual OptionVal.Some("a")
} }
"set state to passivating" in { "set state to passivating" in {
val entities = new sharding.Shard.Entities(NoLogging) val entities = new sharding.Shard.Entities(NoLogging, false)
val ref = ActorRef.noSender val ref = ActorRef.noSender
entities.addEntity("a", ref) entities.addEntity("a", ref)
entities.entityPassivating("a") entities.entityPassivating("a")
entities.entityState("a") shouldEqual OptionVal.Some(Passivating(ref)) entities.entityState("a") shouldEqual Passivating(ref)
} }
} }

View file

@ -0,0 +1,114 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import akka.cluster.{ Cluster, MemberStatus }
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object RememberEntitiesBatchedUpdatesSpec {
case class EntityEnvelope(id: Int, msg: Any)
object EntityActor {
case class Started(id: Int)
case class Stopped(id: Int)
def props(probe: ActorRef) = Props(new EntityActor(probe))
}
class EntityActor(probe: ActorRef) extends Actor with ActorLogging {
import EntityActor._
probe ! Started(self.path.name.toInt)
override def receive: Receive = {
case "stop" =>
log.debug("Got stop message, stopping")
context.stop(self)
case "graceful-stop" =>
log.debug("Got a graceful stop, requesting passivation")
context.parent ! ShardRegion.Passivate("stop")
case "start" =>
log.debug("Got a start")
case "ping" =>
}
override def postStop(): Unit = {
probe ! Stopped(self.path.name.toInt)
}
}
def config = ConfigFactory.parseString("""
akka.loglevel=DEBUG
# akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.actor.provider = cluster
akka.remote.artery.canonical.port = 0
akka.remote.classic.netty.tcp.port = 0
akka.cluster.sharding.state-store-mode = ddata
akka.cluster.sharding.remember-entities = on
# no leaks between test runs thank you
akka.cluster.sharding.distributed-data.durable.keys = []
""".stripMargin)
}
class RememberEntitiesBatchedUpdatesSpec
extends AkkaSpec(RememberEntitiesBatchedUpdatesSpec.config)
with AnyWordSpecLike
with ImplicitSender {
import RememberEntitiesBatchedUpdatesSpec._
val extractEntityId: ShardRegion.ExtractEntityId = {
case EntityEnvelope(id, payload) => (id.toString, payload)
}
val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(_, _) => "1" // single shard for all entities
case ShardRegion.StartEntity(_) => "1"
}
override def atStartup(): Unit = {
// Form a one node cluster
val cluster = Cluster(system)
cluster.join(cluster.selfAddress)
awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1))
}
"Batching of starts and stops" must {
"work" in {
val probe = TestProbe()
val sharding = ClusterSharding(system).start(
"batching",
EntityActor.props(probe.ref),
ClusterShardingSettings(system),
extractEntityId,
extractShardId)
// make sure that sharding is up and running
sharding.tell(EntityEnvelope(0, "ping"), probe.ref)
probe.expectMsg(EntityActor.Started(0))
// start 20, should write first and batch the rest
(1 to 20).foreach { i =>
sharding ! EntityEnvelope(i, "start")
}
probe.receiveN(20)
// start 20 more, and stop the previous ones that are already running,
// should create a mixed batch of start + stops
(21 to 40).foreach { i =>
sharding ! EntityEnvelope(i, "start")
sharding ! EntityEnvelope(i - 20, "graceful-stop")
}
probe.receiveN(40)
// stop the last 20, should batch stops only
(21 to 40).foreach { i =>
sharding ! EntityEnvelope(i, "graceful-stop")
}
probe.receiveN(20)
}
}
}

View file

@ -8,7 +8,6 @@ import akka.Done
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Timers } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Timers }
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.cluster.sharding.ShardRegion.EntityId
import akka.cluster.sharding.ShardRegion.ShardId import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.internal.RememberEntitiesCoordinatorStore import akka.cluster.sharding.internal.RememberEntitiesCoordinatorStore
import akka.cluster.sharding.internal.RememberEntitiesShardStore import akka.cluster.sharding.internal.RememberEntitiesShardStore
@ -44,6 +43,7 @@ object RememberEntitiesFailureSpec {
""") """)
class EntityActor extends Actor with ActorLogging { class EntityActor extends Actor with ActorLogging {
log.info("Entity actor [{}] starting up", context.self.path.name)
override def receive: Receive = { override def receive: Receive = {
case "stop" => case "stop" =>
log.info("Stopping myself!") log.info("Stopping myself!")
@ -87,10 +87,8 @@ object RememberEntitiesFailureSpec {
object FakeShardStoreActor { object FakeShardStoreActor {
def props(shardId: ShardId): Props = Props(new FakeShardStoreActor(shardId)) def props(shardId: ShardId): Props = Props(new FakeShardStoreActor(shardId))
case class FailAddEntity(entityId: Set[EntityId], whichWay: Fail) case class FailUpdateEntity(whichWay: Fail)
case class FailRemoveEntity(entityId: EntityId, whichWay: Fail) case object ClearFail
case class ClearAddFail(entityId: Set[EntityId])
case class ClearRemoveFail(entityId: EntityId)
case class Delayed(replyTo: ActorRef, msg: Any) case class Delayed(replyTo: ActorRef, msg: Any)
} }
@ -98,8 +96,7 @@ object RememberEntitiesFailureSpec {
import FakeShardStoreActor._ import FakeShardStoreActor._
implicit val ec = context.system.dispatcher implicit val ec = context.system.dispatcher
private var failAddEntity = Map.empty[Set[EntityId], Fail] private var failUpdate: Option[Fail] = None
private var failRemoveEntity = Map.empty[EntityId, Fail]
context.system.eventStream.publish(ShardStoreCreated(self, shardId)) context.system.eventStream.publish(ShardStoreCreated(self, shardId))
@ -114,9 +111,9 @@ object RememberEntitiesFailureSpec {
log.debug("Delaying initial entities listing with {}", howLong) log.debug("Delaying initial entities listing with {}", howLong)
timers.startSingleTimer("get-entities-delay", Delayed(sender(), Set.empty), howLong) timers.startSingleTimer("get-entities-delay", Delayed(sender(), Set.empty), howLong)
} }
case RememberEntitiesShardStore.AddEntities(entityId) => case RememberEntitiesShardStore.Update(started, stopped) =>
failAddEntity.get(entityId) match { failUpdate match {
case None => sender ! RememberEntitiesShardStore.UpdateDone(entityId) case None => sender ! RememberEntitiesShardStore.UpdateDone(started, stopped)
case Some(NoResponse) => log.debug("Sending no response for AddEntity") case Some(NoResponse) => log.debug("Sending no response for AddEntity")
case Some(CrashStore) => throw TestException("store crash on AddEntity") case Some(CrashStore) => throw TestException("store crash on AddEntity")
case Some(StopStore) => context.stop(self) case Some(StopStore) => context.stop(self)
@ -124,27 +121,11 @@ object RememberEntitiesFailureSpec {
log.debug("Delaying response for AddEntity with {}", howLong) log.debug("Delaying response for AddEntity with {}", howLong)
timers.startSingleTimer("add-entity-delay", Delayed(sender(), Set.empty), howLong) timers.startSingleTimer("add-entity-delay", Delayed(sender(), Set.empty), howLong)
} }
case RememberEntitiesShardStore.RemoveEntity(entityId) => case FailUpdateEntity(whichWay) =>
failRemoveEntity.get(entityId) match { failUpdate = Some(whichWay)
case None => sender ! RememberEntitiesShardStore.UpdateDone(Set(entityId))
case Some(NoResponse) => log.debug("Sending no response for RemoveEntity")
case Some(CrashStore) => throw TestException("store crash on AddEntity")
case Some(StopStore) => context.stop(self)
case Some(Delay(howLong)) =>
log.debug("Delaying response for RemoveEntity with {}", howLong)
timers.startSingleTimer("remove-entity-delay", Delayed(sender(), Set.empty), howLong)
}
case FailAddEntity(id, whichWay) =>
failAddEntity = failAddEntity.updated(id, whichWay)
sender() ! Done sender() ! Done
case FailRemoveEntity(id, whichWay) => case ClearFail =>
failRemoveEntity = failRemoveEntity.updated(id, whichWay) failUpdate = None
sender() ! Done
case ClearAddFail(id) =>
failAddEntity = failAddEntity - id
sender() ! Done
case ClearRemoveFail(id) =>
failRemoveEntity = failRemoveEntity - id
sender() ! Done sender() ! Done
case Delayed(to, msg) => case Delayed(to, msg) =>
to ! msg to ! msg
@ -217,7 +198,7 @@ class RememberEntitiesFailureSpec
"Remember entities handling in sharding" must { "Remember entities handling in sharding" must {
List(NoResponse, CrashStore, StopStore, Delay(1.second), Delay(2.seconds)).foreach { wayToFail: Fail => List(NoResponse, CrashStore, StopStore, Delay(500.millis), Delay(1.second)).foreach { wayToFail: Fail =>
s"recover when initial remember entities load fails $wayToFail" in { s"recover when initial remember entities load fails $wayToFail" in {
log.debug("Getting entities for shard 1 will fail") log.debug("Getting entities for shard 1 will fail")
failShardGetEntities = Map("1" -> wayToFail) failShardGetEntities = Map("1" -> wayToFail)
@ -267,7 +248,7 @@ class RememberEntitiesFailureSpec
probe.expectMsg("hello-1") probe.expectMsg("hello-1")
// hit shard with other entity that will fail // hit shard with other entity that will fail
shardStore.tell(FakeShardStoreActor.FailAddEntity(Set("11"), wayToFail), storeProbe.ref) shardStore.tell(FakeShardStoreActor.FailUpdateEntity(wayToFail), storeProbe.ref)
storeProbe.expectMsg(Done) storeProbe.expectMsg(Done)
sharding.tell(EntityEnvelope(11, "hello-11"), probe.ref) sharding.tell(EntityEnvelope(11, "hello-11"), probe.ref)
@ -280,7 +261,7 @@ class RememberEntitiesFailureSpec
} }
val stopFailingProbe = TestProbe() val stopFailingProbe = TestProbe()
shardStore.tell(FakeShardStoreActor.ClearAddFail(Set("11")), stopFailingProbe.ref) shardStore.tell(FakeShardStoreActor.ClearFail, stopFailingProbe.ref)
stopFailingProbe.expectMsg(Done) stopFailingProbe.expectMsg(Done)
// it takes a while - timeout hits and then backoff // it takes a while - timeout hits and then backoff
@ -310,13 +291,13 @@ class RememberEntitiesFailureSpec
probe.expectMsg("hello-1") probe.expectMsg("hello-1")
// fail it when stopping // fail it when stopping
shard1Store.tell(FakeShardStoreActor.FailRemoveEntity("1", wayToFail), storeProbe.ref) shard1Store.tell(FakeShardStoreActor.FailUpdateEntity(wayToFail), storeProbe.ref)
storeProbe.expectMsg(Done) storeProbe.expectMsg(Done)
// FIXME restart without passivating is not saved and re-started again without storing the stop so this isn't testing anything // FIXME restart without passivating is not saved and re-started again without storing the stop so this isn't testing anything
sharding ! EntityEnvelope(1, "stop") sharding ! EntityEnvelope(1, "stop")
shard1Store.tell(FakeShardStoreActor.ClearRemoveFail("1"), storeProbe.ref) shard1Store.tell(FakeShardStoreActor.ClearFail, storeProbe.ref)
storeProbe.expectMsg(Done) storeProbe.expectMsg(Done)
// it takes a while - timeout hits and then backoff // it takes a while - timeout hits and then backoff
@ -348,13 +329,17 @@ class RememberEntitiesFailureSpec
probe.expectMsg("hello-1") probe.expectMsg("hello-1")
// fail it when stopping // fail it when stopping
shard1Store.tell(FakeShardStoreActor.FailRemoveEntity("1", wayToFail), storeProbe.ref) shard1Store.tell(FakeShardStoreActor.FailUpdateEntity(wayToFail), storeProbe.ref)
storeProbe.expectMsg(Done) storeProbe.expectMsg(Done)
sharding ! EntityEnvelope(1, "graceful-stop") sharding ! EntityEnvelope(1, "graceful-stop")
shard1Store.tell(FakeShardStoreActor.ClearRemoveFail("1"), storeProbe.ref) if (wayToFail != CrashStore && wayToFail != StopStore) {
storeProbe.expectMsg(Done) // race, give the shard some time to see the passivation before restoring the fake shard store
Thread.sleep(250)
shard1Store.tell(FakeShardStoreActor.ClearFail, probe.ref)
probe.expectMsg(Done)
}
// it takes a while? // it takes a while?
awaitAssert({ awaitAssert({

View file

@ -0,0 +1,75 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.internal
import akka.cluster.ddata.{ Replicator, ReplicatorSettings }
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.{ Cluster, MemberStatus }
import akka.testkit.{ AkkaSpec, ImplicitSender, WithLogCapturing }
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object DDataRememberEntitiesShardStoreSpec {
def config = ConfigFactory.parseString("""
akka.loglevel=DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.actor.provider = cluster
akka.remote.artery.canonical.port = 0
akka.remote.classic.netty.tcp.port = 0
akka.cluster.sharding.state-store-mode = ddata
akka.cluster.sharding.remember-entities = on
# no leaks between test runs thank you
akka.cluster.sharding.distributed-data.durable.keys = []
""".stripMargin)
}
// FIXME generalize to general test and cover both ddata and eventsourced
class DDataRememberEntitiesShardStoreSpec
extends AkkaSpec(DDataRememberEntitiesShardStoreSpec.config)
with AnyWordSpecLike
with ImplicitSender
with WithLogCapturing {
override def atStartup(): Unit = {
// Form a one node cluster
val cluster = Cluster(system)
cluster.join(cluster.selfAddress)
awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1))
}
"The DDataRememberEntitiesShardStore" must {
"store starts and stops and list remembered entity ids" in {
val replicatorSettings = ReplicatorSettings(system)
val replicator = system.actorOf(Replicator.props(replicatorSettings))
val shardingSettings = ClusterShardingSettings(system)
val store = system.actorOf(
DDataRememberEntitiesShardStore
.props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1))
store ! RememberEntitiesShardStore.GetEntities
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should be(empty)
store ! RememberEntitiesShardStore.Update(Set("1", "2", "3"), Set.empty)
expectMsg(RememberEntitiesShardStore.UpdateDone(Set("1", "2", "3"), Set.empty))
store ! RememberEntitiesShardStore.Update(Set("4", "5", "6"), Set("2", "3"))
expectMsg(RememberEntitiesShardStore.UpdateDone(Set("4", "5", "6"), Set("2", "3")))
store ! RememberEntitiesShardStore.Update(Set.empty, Set("6"))
expectMsg(RememberEntitiesShardStore.UpdateDone(Set.empty, Set("6")))
store ! RememberEntitiesShardStore.Update(Set("2"), Set.empty)
expectMsg(RememberEntitiesShardStore.UpdateDone(Set("2"), Set.empty))
store ! RememberEntitiesShardStore.GetEntities
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5"))
}
}
}

View file

@ -76,7 +76,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
"be able to serialize PersistentShard domain events" in { "be able to serialize PersistentShard domain events" in {
checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStarted(Set("e1", "e2"))) checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStarted(Set("e1", "e2")))
checkSerialization(EventSourcedRememberEntitiesStore.EntityStopped("e1")) checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStopped(Set("e1", "e2")))
} }
"be able to deserialize old entity started event into entities started" in { "be able to deserialize old entity started event into entities started" in {