From 86aa42cf6c99fb605d66a11d6e1bbb3a52b9de24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 22 May 2017 10:08:18 +0200 Subject: [PATCH] remember entities and changing shardIdExtractor (#22894) * Test case covering changing shard id extractor with remember-entities * This should do the trick * Feedback addressed * Docs and migration guide mention * Correct logic to persist that entity has moved off off shard --- .../protobuf/msg/ClusterShardingMessages.java | 1150 ++++++++++++++++- .../protobuf/ClusterShardingMessages.proto | 9 + .../scala/akka/cluster/sharding/Shard.scala | 83 +- .../akka/cluster/sharding/ShardRegion.scala | 25 + .../ClusterShardingMessageSerializer.scala | 52 +- ...dingRememberEntitiesNewExtractorSpec.scala | 296 +++++ .../src/test/resources/reference.conf | 2 +- ...ClusterShardingMessageSerializerSpec.scala | 24 +- .../src/main/paradox/java/cluster-sharding.md | 15 +- .../main/paradox/scala/cluster-sharding.md | 15 +- .../project/migration-guide-2.4.x-2.5.x.md | 12 + 11 files changed, 1648 insertions(+), 35 deletions(-) create mode 100644 akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala diff --git a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java index 18170e75d7..b68ffc731a 100644 --- a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java +++ b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java @@ -6159,6 +6159,1128 @@ public final class ClusterShardingMessages { // @@protoc_insertion_point(class_scope:ShardStats) } + public interface StartEntityOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required string entityId = 1; + /** + * required string entityId = 1; + */ + boolean hasEntityId(); + /** + * required string entityId = 1; + */ + java.lang.String getEntityId(); + /** + * required string entityId = 1; + */ + akka.protobuf.ByteString + getEntityIdBytes(); + } + /** + * Protobuf type {@code StartEntity} + */ + public static final class StartEntity extends + akka.protobuf.GeneratedMessage + implements StartEntityOrBuilder { + // Use StartEntity.newBuilder() to construct. + private StartEntity(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private StartEntity(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final StartEntity defaultInstance; + public static StartEntity getDefaultInstance() { + return defaultInstance; + } + + public StartEntity getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private StartEntity( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + entityId_ = input.readBytes(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public StartEntity parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new StartEntity(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string entityId = 1; + public static final int ENTITYID_FIELD_NUMBER = 1; + private java.lang.Object entityId_; + /** + * required string entityId = 1; + */ + public boolean hasEntityId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string entityId = 1; + */ + public java.lang.String getEntityId() { + java.lang.Object ref = entityId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + entityId_ = s; + } + return s; + } + } + /** + * required string entityId = 1; + */ + public akka.protobuf.ByteString + getEntityIdBytes() { + java.lang.Object ref = entityId_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + entityId_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + private void initFields() { + entityId_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasEntityId()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getEntityIdBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(1, getEntityIdBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code StartEntity} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.Builder.class); + } + + // Construct using akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + entityId_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_descriptor; + } + + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity getDefaultInstanceForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.getDefaultInstance(); + } + + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity build() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity buildPartial() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity result = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.entityId_ = entityId_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity) { + return mergeFrom((akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity other) { + if (other == akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.getDefaultInstance()) return this; + if (other.hasEntityId()) { + bitField0_ |= 0x00000001; + entityId_ = other.entityId_; + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasEntityId()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string entityId = 1; + private java.lang.Object entityId_ = ""; + /** + * required string entityId = 1; + */ + public boolean hasEntityId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string entityId = 1; + */ + public java.lang.String getEntityId() { + java.lang.Object ref = entityId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + entityId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string entityId = 1; + */ + public akka.protobuf.ByteString + getEntityIdBytes() { + java.lang.Object ref = entityId_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + entityId_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string entityId = 1; + */ + public Builder setEntityId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + entityId_ = value; + onChanged(); + return this; + } + /** + * required string entityId = 1; + */ + public Builder clearEntityId() { + bitField0_ = (bitField0_ & ~0x00000001); + entityId_ = getDefaultInstance().getEntityId(); + onChanged(); + return this; + } + /** + * required string entityId = 1; + */ + public Builder setEntityIdBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + entityId_ = value; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:StartEntity) + } + + static { + defaultInstance = new StartEntity(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:StartEntity) + } + + public interface StartEntityAckOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required string entityId = 1; + /** + * required string entityId = 1; + */ + boolean hasEntityId(); + /** + * required string entityId = 1; + */ + java.lang.String getEntityId(); + /** + * required string entityId = 1; + */ + akka.protobuf.ByteString + getEntityIdBytes(); + + // required string shardId = 2; + /** + * required string shardId = 2; + */ + boolean hasShardId(); + /** + * required string shardId = 2; + */ + java.lang.String getShardId(); + /** + * required string shardId = 2; + */ + akka.protobuf.ByteString + getShardIdBytes(); + } + /** + * Protobuf type {@code StartEntityAck} + */ + public static final class StartEntityAck extends + akka.protobuf.GeneratedMessage + implements StartEntityAckOrBuilder { + // Use StartEntityAck.newBuilder() to construct. + private StartEntityAck(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private StartEntityAck(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final StartEntityAck defaultInstance; + public static StartEntityAck getDefaultInstance() { + return defaultInstance; + } + + public StartEntityAck getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private StartEntityAck( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + entityId_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + shardId_ = input.readBytes(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public StartEntityAck parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new StartEntityAck(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string entityId = 1; + public static final int ENTITYID_FIELD_NUMBER = 1; + private java.lang.Object entityId_; + /** + * required string entityId = 1; + */ + public boolean hasEntityId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string entityId = 1; + */ + public java.lang.String getEntityId() { + java.lang.Object ref = entityId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + entityId_ = s; + } + return s; + } + } + /** + * required string entityId = 1; + */ + public akka.protobuf.ByteString + getEntityIdBytes() { + java.lang.Object ref = entityId_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + entityId_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + // required string shardId = 2; + public static final int SHARDID_FIELD_NUMBER = 2; + private java.lang.Object shardId_; + /** + * required string shardId = 2; + */ + public boolean hasShardId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required string shardId = 2; + */ + public java.lang.String getShardId() { + java.lang.Object ref = shardId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + shardId_ = s; + } + return s; + } + } + /** + * required string shardId = 2; + */ + public akka.protobuf.ByteString + getShardIdBytes() { + java.lang.Object ref = shardId_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + shardId_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + private void initFields() { + entityId_ = ""; + shardId_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasEntityId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasShardId()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getEntityIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getShardIdBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(1, getEntityIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(2, getShardIdBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code StartEntityAck} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAckOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.Builder.class); + } + + // Construct using akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + entityId_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + shardId_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_descriptor; + } + + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck getDefaultInstanceForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.getDefaultInstance(); + } + + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck build() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck buildPartial() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck result = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.entityId_ = entityId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.shardId_ = shardId_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck) { + return mergeFrom((akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck other) { + if (other == akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.getDefaultInstance()) return this; + if (other.hasEntityId()) { + bitField0_ |= 0x00000001; + entityId_ = other.entityId_; + onChanged(); + } + if (other.hasShardId()) { + bitField0_ |= 0x00000002; + shardId_ = other.shardId_; + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasEntityId()) { + + return false; + } + if (!hasShardId()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string entityId = 1; + private java.lang.Object entityId_ = ""; + /** + * required string entityId = 1; + */ + public boolean hasEntityId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string entityId = 1; + */ + public java.lang.String getEntityId() { + java.lang.Object ref = entityId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + entityId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string entityId = 1; + */ + public akka.protobuf.ByteString + getEntityIdBytes() { + java.lang.Object ref = entityId_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + entityId_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string entityId = 1; + */ + public Builder setEntityId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + entityId_ = value; + onChanged(); + return this; + } + /** + * required string entityId = 1; + */ + public Builder clearEntityId() { + bitField0_ = (bitField0_ & ~0x00000001); + entityId_ = getDefaultInstance().getEntityId(); + onChanged(); + return this; + } + /** + * required string entityId = 1; + */ + public Builder setEntityIdBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + entityId_ = value; + onChanged(); + return this; + } + + // required string shardId = 2; + private java.lang.Object shardId_ = ""; + /** + * required string shardId = 2; + */ + public boolean hasShardId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required string shardId = 2; + */ + public java.lang.String getShardId() { + java.lang.Object ref = shardId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + shardId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string shardId = 2; + */ + public akka.protobuf.ByteString + getShardIdBytes() { + java.lang.Object ref = shardId_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + shardId_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string shardId = 2; + */ + public Builder setShardId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + shardId_ = value; + onChanged(); + return this; + } + /** + * required string shardId = 2; + */ + public Builder clearShardId() { + bitField0_ = (bitField0_ & ~0x00000002); + shardId_ = getDefaultInstance().getShardId(); + onChanged(); + return this; + } + /** + * required string shardId = 2; + */ + public Builder setShardIdBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + shardId_ = value; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:StartEntityAck) + } + + static { + defaultInstance = new StartEntityAck(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:StartEntityAck) + } + private static akka.protobuf.Descriptors.Descriptor internal_static_CoordinatorState_descriptor; private static @@ -6209,6 +7331,16 @@ public final class ClusterShardingMessages { private static akka.protobuf.GeneratedMessage.FieldAccessorTable internal_static_ShardStats_fieldAccessorTable; + private static akka.protobuf.Descriptors.Descriptor + internal_static_StartEntity_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_StartEntity_fieldAccessorTable; + private static akka.protobuf.Descriptors.Descriptor + internal_static_StartEntityAck_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_StartEntityAck_fieldAccessorTable; public static akka.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -6231,8 +7363,10 @@ public final class ClusterShardingMessages { "ties\030\001 \003(\t\"!\n\rEntityStarted\022\020\n\010entityId\030" + "\001 \002(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t" + "\"0\n\nShardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityC" + - "ount\030\002 \002(\005B&\n\"akka.cluster.sharding.prot" + - "obuf.msgH\001" + "ount\030\002 \002(\005\"\037\n\013StartEntity\022\020\n\010entityId\030\001 " + + "\002(\t\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022" + + "\017\n\007shardId\030\002 \002(\tB&\n\"akka.cluster.shardin" + + "g.protobuf.msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6299,6 +7433,18 @@ public final class ClusterShardingMessages { akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ShardStats_descriptor, new java.lang.String[] { "Shard", "EntityCount", }); + internal_static_StartEntity_descriptor = + getDescriptor().getMessageTypes().get(9); + internal_static_StartEntity_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_StartEntity_descriptor, + new java.lang.String[] { "EntityId", }); + internal_static_StartEntityAck_descriptor = + getDescriptor().getMessageTypes().get(10); + internal_static_StartEntityAck_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_StartEntityAck_descriptor, + new java.lang.String[] { "EntityId", "ShardId", }); return null; } }; diff --git a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto index a5fab0a382..1493e58662 100644 --- a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto +++ b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto @@ -52,3 +52,12 @@ message ShardStats { required string shard = 1; required int32 entityCount = 2; } + +message StartEntity { + required string entityId = 1; +} + +message StartEntityAck { + required string entityId = 1; + required string shardId = 2; +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 8c27304903..41af79ca98 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -47,7 +47,7 @@ private[akka] object Shard { sealed trait ShardCommand /** - * When an remembering entities and the entity stops without issuing a `Passivate`, we + * When remembering entities and the entity stops without issuing a `Passivate`, we * restart it after a back off using this message. */ final case class RestartEntity(entity: EntityId) extends ShardCommand @@ -170,6 +170,8 @@ private[akka] class Shard( case Terminated(ref) ⇒ receiveTerminated(ref) case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) case msg: ShardCommand ⇒ receiveShardCommand(msg) + case msg: ShardRegion.StartEntity ⇒ receiveStartEntity(msg) + case msg: ShardRegion.StartEntityAck ⇒ receiveStartEntityAck(msg) case msg: ShardRegionCommand ⇒ receiveShardRegionCommand(msg) case msg: ShardQuery ⇒ receiveShardQuery(msg) case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) @@ -177,7 +179,27 @@ private[akka] class Shard( def receiveShardCommand(msg: ShardCommand): Unit = msg match { case RestartEntity(id) ⇒ getEntity(id) - case RestartEntities(ids) ⇒ ids foreach getEntity + case RestartEntities(ids) ⇒ restartEntities(ids) + } + + def receiveStartEntity(start: ShardRegion.StartEntity): Unit = { + log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", sender(), start.entityId, shardId) + getEntity(start.entityId) + sender() ! ShardRegion.StartEntityAck(start.entityId, shardId) + } + + def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = { + if (ack.shardId != shardId && state.entities.contains(ack.entityId)) { + log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId) + processChange(EntityStopped(ack.entityId)) { _ ⇒ + state = state.copy(state.entities - ack.entityId) + messageBuffers.remove(ack.entityId) + } + } + } + + def restartEntities(ids: Set[EntityId]): Unit = { + context.actorOf(RememberEntityStarter.props(typeName, shardId, ids, settings, sender())) } def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { @@ -318,6 +340,63 @@ private[akka] class Shard( } } +private[akka] object RememberEntityStarter { + def props( + typeName: String, + shardId: ShardRegion.ShardId, + ids: Set[ShardRegion.EntityId], + settings: ClusterShardingSettings, + requestor: ActorRef) = + Props(new RememberEntityStarter(typeName, shardId, ids, settings, requestor)) +} + +/** + * INTERNAL API: Actor responsible for starting entities when rememberEntities is enabled + */ +private[akka] class RememberEntityStarter( + typeName: String, + shardId: ShardRegion.ShardId, + ids: Set[ShardRegion.EntityId], + settings: ClusterShardingSettings, + requestor: ActorRef +) extends Actor { + + import context.dispatcher + import scala.concurrent.duration._ + + case object Tick + + val region = ClusterSharding(context.system).shardRegion(typeName) + var waitingForAck = ids + + sendStart(ids) + + val tickTask = { + val resendInterval = settings.tuningParameters.retryInterval + context.system.scheduler.schedule(resendInterval, resendInterval, self, Tick) + } + + def sendStart(ids: Set[ShardRegion.EntityId]): Unit = { + ids.foreach(id ⇒ region ! ShardRegion.StartEntity(id)) + } + + override def receive = { + case ack: ShardRegion.StartEntityAck ⇒ + waitingForAck -= ack.entityId + // inform whoever requested the start that it happened + requestor ! ack + if (waitingForAck.isEmpty) context.stop(self) + + case Tick ⇒ + sendStart(waitingForAck) + + } + + override def postStop(): Unit = { + tickTask.cancel() + } +} + /** * INTERNAL API: Common things for PersistentShard and DDataShard */ diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 1d80818714..27b807b8f8 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -4,6 +4,7 @@ package akka.cluster.sharding import java.net.URLEncoder + import akka.pattern.AskTimeoutException import akka.util.{ MessageBufferMap, Timeout } import akka.pattern.{ ask, pipe } @@ -12,6 +13,7 @@ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.Member import akka.cluster.MemberStatus + import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.Future @@ -300,6 +302,19 @@ object ShardRegion { */ private final case class RestartShard(shardId: ShardId) + /** + * When remembering entities and a shard is started, each entity id that needs to + * be running will trigger this message being sent through sharding. For this to work + * the message *must* be handled by the shard id extractor. + */ + final case class StartEntity(entityId: EntityId) extends ClusterShardingSerializable + + /** + * Sent back when a `ShardRegion.StartEntity` message was received and triggered the entity + * to start (it does not guarantee the entity successfully started) + */ + final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId) extends ClusterShardingSerializable + private def roleOption(role: String): Option[String] = if (role == "") None else Option(role) @@ -438,6 +453,7 @@ private[akka] class ShardRegion( case cmd: ShardRegionCommand ⇒ receiveCommand(cmd) case query: ShardRegionQuery ⇒ receiveQuery(query) case msg: RestartShard ⇒ deliverMessage(msg, sender()) + case msg: StartEntity ⇒ deliverStartEntity(msg, sender()) case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) case unknownMsg ⇒ log.warning("Message does not have an extractor defined in shard [{}] so it was ignored: {}", typeName, unknownMsg) } @@ -699,6 +715,15 @@ private[akka] class ShardRegion( retryCount = 0 } + def deliverStartEntity(msg: StartEntity, snd: ActorRef): Unit = { + try { + deliverMessage(msg, snd) + } catch { + case ex: MatchError ⇒ + log.error(ex, "When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).") + } + } + def deliverMessage(msg: Any, snd: ActorRef): Unit = msg match { case RestartShard(shardId) ⇒ diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index 806075cc28..03442a42c0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -10,7 +10,6 @@ import java.util.zip.GZIPOutputStream import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.breakOut - import akka.actor.ActorRef import akka.actor.ExtendedActorSystem import akka.cluster.sharding.Shard @@ -18,11 +17,12 @@ import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages ⇒ sm } import akka.serialization.BaseSerializer import akka.serialization.Serialization -import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest import akka.protobuf.MessageLite import java.io.NotSerializableException +import akka.cluster.sharding.ShardRegion.{ StartEntity, StartEntityAck } + /** * INTERNAL API: Protobuf serializer of ClusterSharding messages. */ @@ -59,6 +59,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private val EntityStartedManifest = "CB" private val EntityStoppedManifest = "CD" + private val StartEntityManifest = "EA" + private val StartEntityAckManifest = "EB" + private val GetShardStatsManifest = "DA" private val ShardStatsManifest = "DB" @@ -89,7 +92,11 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy GracefulShutdownReqManifest → { bytes ⇒ GracefulShutdownReq(actorRefMessageFromBinary(bytes)) }, GetShardStatsManifest → { bytes ⇒ GetShardStats }, - ShardStatsManifest → { bytes ⇒ shardStatsFromBinary(bytes) }) + ShardStatsManifest → { bytes ⇒ shardStatsFromBinary(bytes) }, + + StartEntityManifest → { startEntityFromBinary(_) }, + StartEntityAckManifest → { startEntityAckFromBinary(_) } + ) override def manifest(obj: AnyRef): String = obj match { case _: EntityState ⇒ EntityStateManifest @@ -117,6 +124,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy case _: ShardStopped ⇒ ShardStoppedManifest case _: GracefulShutdownReq ⇒ GracefulShutdownReqManifest + case _: StartEntity ⇒ StartEntityManifest + case _: StartEntityAck ⇒ StartEntityAckManifest + case GetShardStats ⇒ GetShardStatsManifest case _: ShardStats ⇒ ShardStatsManifest case _ ⇒ @@ -146,12 +156,15 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy case GracefulShutdownReq(ref) ⇒ actorRefMessageToProto(ref).toByteArray - case m: EntityState ⇒ entityStateToProto(m).toByteArray - case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray - case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray + case m: EntityState ⇒ entityStateToProto(m).toByteArray + case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray + case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray - case GetShardStats ⇒ Array.emptyByteArray - case m: ShardStats ⇒ shardStatsToProto(m).toByteArray + case s: StartEntity ⇒ startEntityToByteArray(s) + case s: StartEntityAck ⇒ startEntityAckToByteArray(s) + + case GetShardStats ⇒ Array.emptyByteArray + case m: ShardStats ⇒ shardStatsToProto(m).toByteArray case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") @@ -266,6 +279,29 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy ShardStats(parsed.getShard, parsed.getEntityCount) } + private def startEntityToByteArray(s: StartEntity): Array[Byte] = { + val builder = sm.StartEntity.newBuilder() + builder.setEntityId(s.entityId) + builder.build().toByteArray + } + + private def startEntityFromBinary(bytes: Array[Byte]): StartEntity = { + val se = sm.StartEntity.parseFrom(bytes) + StartEntity(se.getEntityId) + } + + private def startEntityAckToByteArray(s: StartEntityAck): Array[Byte] = { + val builder = sm.StartEntityAck.newBuilder() + builder.setEntityId(s.entityId) + builder.setShardId(s.shardId) + builder.build().toByteArray + } + + private def startEntityAckFromBinary(bytes: Array[Byte]): StartEntityAck = { + val sea = sm.StartEntityAck.parseFrom(bytes) + StartEntityAck(sea.getEntityId, sea.getShardId) + } + private def resolveActorRef(path: String): ActorRef = { system.provider.resolveActorRef(path) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala new file mode 100644 index 0000000000..01a43a1616 --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala @@ -0,0 +1,296 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.cluster.sharding + +import java.io.File + +import akka.actor._ +import akka.cluster.{ Cluster, MemberStatus } +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils + +import scala.concurrent.duration._ + +object ClusterShardingRememberEntitiesNewExtractorSpec { + + final case class Started(ref: ActorRef) + + def props(probe: Option[ActorRef]): Props = Props(new TestEntity(probe)) + + class TestEntity(probe: Option[ActorRef]) extends Actor with ActorLogging { + log.info("Entity started: " + self.path) + probe.foreach(_ ! Started(self)) + + def receive = { + case m ⇒ sender() ! m + } + } + + val shardCount = 5 + + val extractEntityId: ShardRegion.ExtractEntityId = { + case id: Int ⇒ (id.toString, id) + } + + val extractShardId1: ShardRegion.ExtractShardId = { + case id: Int ⇒ (id % shardCount).toString + case ShardRegion.StartEntity(id) ⇒ extractShardId1(id.toInt) + } + + val extractShardId2: ShardRegion.ExtractShardId = { + // always bump it one shard id + case id: Int ⇒ ((id + 1) % shardCount).toString + case ShardRegion.StartEntity(id) ⇒ extractShardId2(id.toInt) + } + +} + +abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: String) extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = DEBUG + akka.actor.provider = "cluster" + akka.cluster.auto-down-unreachable-after = 0s + akka.remote.log-remote-lifecycle-events = off + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/ShardingRememberEntitiesNewExtractorSpec/journal" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/ShardingRememberEntitiesNewExtractorSpec/snapshots" + akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-ddata + map-size = 10 MiB + } + """)) + + val roleConfig = ConfigFactory.parseString( + """ + akka.cluster.roles = [sharding] + """) + + // we pretend node 4 and 5 are new incarnations of node 2 and 3 as they never run in parallel + // so we can use the same lmdb store for them and have node 4 pick up the persisted data of node 2 + val ddataNodeAConfig = ConfigFactory.parseString( + """ + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-node-a + } + """) + val ddataNodeBConfig = ConfigFactory.parseString( + """ + akka.cluster.sharding.distributed-data.durable.lmdb { + dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-node-b + } + """) + + nodeConfig(second)(roleConfig.withFallback(ddataNodeAConfig)) + nodeConfig(third)(roleConfig.withFallback(ddataNodeBConfig)) + nodeConfig(fourth)(roleConfig.withFallback(ddataNodeAConfig)) + nodeConfig(fifth)(roleConfig.withFallback(ddataNodeBConfig)) + +} + +object PersistentClusterShardingRememberEntitiesSpecNewExtractorConfig extends ClusterShardingRememberEntitiesNewExtractorSpecConfig( + ClusterShardingSettings.StateStoreModePersistence) +object DDataClusterShardingRememberEntitiesNewExtractorSpecConfig extends ClusterShardingRememberEntitiesNewExtractorSpecConfig( + ClusterShardingSettings.StateStoreModeDData) + +class PersistentClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardingRememberEntitiesNewExtractorSpec( + PersistentClusterShardingRememberEntitiesSpecNewExtractorConfig) + +class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec +class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec +class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec +class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec +class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec + +class DDataClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardingRememberEntitiesNewExtractorSpec( + DDataClusterShardingRememberEntitiesNewExtractorSpecConfig) + +class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends DDataClusterShardingRememberEntitiesNewExtractorSpec +class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends DDataClusterShardingRememberEntitiesNewExtractorSpec +class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends DDataClusterShardingRememberEntitiesNewExtractorSpec +class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends DDataClusterShardingRememberEntitiesNewExtractorSpec +class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends DDataClusterShardingRememberEntitiesNewExtractorSpec + +abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterShardingRememberEntitiesNewExtractorSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { + import ClusterShardingRememberEntitiesNewExtractorSpec._ + import config._ + + val typeName = "Entity" + + override def initialParticipants = roles.size + + val storageLocations = List(new File(system.settings.config.getString( + "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) + + override protected def atStartup() { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + enterBarrier("startup") + } + + override protected def afterTermination() { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir)) + } + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + } + enterBarrier(from.name + "-joined") + } + + val cluster = Cluster(system) + + def startShardingWithExtractor1(): Unit = { + ClusterSharding(system).start( + typeName = typeName, + entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(None), + settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"), + extractEntityId = extractEntityId, + extractShardId = extractShardId1) + } + + def startShardingWithExtractor2(): Unit = { + ClusterSharding(system).start( + typeName = typeName, + entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(testActor)), + settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"), + extractEntityId = extractEntityId, + extractShardId = extractShardId2) + } + + lazy val region = ClusterSharding(system).shardRegion(typeName) + + def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + + s"Cluster with min-nr-of-members using sharding ($mode)" must { + + if (!isDdataMode) { + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("persistence-started") + + runOn(second, third, fourth, fifth) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } + + enterBarrier("after-1") + } + } + + "start up first cluster and sharding" in within(15.seconds) { + join(first, first) + join(second, first) + join(third, first) + + runOn(first, second, third) { + within(remaining) { + awaitAssert { + cluster.state.members.count(_.status == MemberStatus.Up) should ===(3) + } + } + } + runOn(second, third) { + startShardingWithExtractor1() + } + enterBarrier("first-cluster-up") + + runOn(second, third) { + // one entity for each shard id + (1 to 10).foreach { n ⇒ + region ! n + expectMsg(n) + } + } + enterBarrier("first-cluster-entities-up") + } + + "shutdown sharding nodes" in within(30.seconds) { + runOn(first) { + testConductor.exit(second, 0).await + testConductor.exit(third, 0).await + } + runOn(first) { + within(remaining) { + awaitAssert { + cluster.state.members.count(_.status == MemberStatus.Up) should ===(1) + } + } + } + enterBarrier("first-sharding-cluster-stopped") + } + + "start new nodes with different extractor" in within(15.seconds) { + + // start it with a new shard id extractor, which will put the entities + // on different shards + + join(fourth, first) + join(fifth, first) + runOn(first) { + within(remaining) { + awaitAssert { + cluster.state.members.count(_.status == MemberStatus.Up) should ===(3) + } + } + } + runOn(fourth, fifth) { + startShardingWithExtractor2() + } + + // TODO how do we know that the shards has started?? + Thread.sleep(7000) + enterBarrier("new-nodes-started") + } + + "have the remembered entities running on the right shards" in within(15.seconds) { + runOn(fourth, fifth) { + var stats: ShardRegion.CurrentShardRegionState = null + within(remaining) { + awaitAssert { + region ! ShardRegion.GetShardRegionState + val reply = expectMsgType[ShardRegion.CurrentShardRegionState] + reply.shards should not be empty + stats = reply + } + } + + for { + shardState ← stats.shards + entityId ← shardState.entityIds + } { + val calculatedShardId = extractShardId2(entityId.toInt) + calculatedShardId should ===(shardState.shardId) + } + } + + enterBarrier("done") + } + + } +} + diff --git a/akka-cluster-sharding/src/test/resources/reference.conf b/akka-cluster-sharding/src/test/resources/reference.conf index eebfbdd0a2..589f6650ea 100644 --- a/akka-cluster-sharding/src/test/resources/reference.conf +++ b/akka-cluster-sharding/src/test/resources/reference.conf @@ -1,6 +1,6 @@ akka { actor { - serialize-creators = on + serialize-creators = off serialize-messages = on warn-about-java-serializer-usage = off } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala index d37c20ba31..80831f0036 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala @@ -3,11 +3,10 @@ */ package akka.cluster.sharding.protobuf -import akka.actor.{ ExtendedActorSystem } +import akka.actor.ExtendedActorSystem import akka.testkit.AkkaSpec import akka.actor.Props -import akka.cluster.sharding.ShardCoordinator -import akka.cluster.sharding.Shard +import akka.cluster.sharding.{ Shard, ShardCoordinator, ShardRegion } class ClusterShardingMessageSerializerSpec extends AkkaSpec { import ShardCoordinator.Internal._ @@ -28,7 +27,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec { "ClusterShardingMessageSerializer" must { - "be able to serializable ShardCoordinator snapshot State" in { + "be able to serialize ShardCoordinator snapshot State" in { val state = State( shards = Map("a" → region1, "b" → region2, "c" → region2), regions = Map(region1 → Vector("a"), region2 → Vector("b", "c"), region3 → Vector.empty[String]), @@ -37,7 +36,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec { checkSerialization(state) } - "be able to serializable ShardCoordinator domain events" in { + "be able to serialize ShardCoordinator domain events" in { checkSerialization(ShardRegionRegistered(region1)) checkSerialization(ShardRegionProxyRegistered(regionProxy1)) checkSerialization(ShardRegionTerminated(region1)) @@ -46,7 +45,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec { checkSerialization(ShardHomeDeallocated("a")) } - "be able to serializable ShardCoordinator remote messages" in { + "be able to serialize ShardCoordinator remote messages" in { checkSerialization(Register(region1)) checkSerialization(RegisterProxy(regionProxy1)) checkSerialization(RegisterAck(region1)) @@ -61,21 +60,26 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec { checkSerialization(GracefulShutdownReq(region1)) } - "be able to serializable PersistentShard snapshot state" in { + "be able to serialize PersistentShard snapshot state" in { checkSerialization(Shard.State(Set("e1", "e2", "e3"))) } - "be able to serializable PersistentShard domain events" in { + "be able to serialize PersistentShard domain events" in { checkSerialization(Shard.EntityStarted("e1")) checkSerialization(Shard.EntityStopped("e1")) } - "be able to serializable GetShardStats" in { + "be able to serialize GetShardStats" in { checkSerialization(Shard.GetShardStats) } - "be able to serializable ShardStats" in { + "be able to serialize ShardStats" in { checkSerialization(Shard.ShardStats("a", 23)) } + + "be able to serialize StartEntity" in { + checkSerialization(ShardRegion.StartEntity("42")) + checkSerialization(ShardRegion.StartEntityAck("13", "37")) + } } } diff --git a/akka-docs/src/main/paradox/java/cluster-sharding.md b/akka-docs/src/main/paradox/java/cluster-sharding.md index d67de743f8..5bfde4acd8 100644 --- a/akka-docs/src/main/paradox/java/cluster-sharding.md +++ b/akka-docs/src/main/paradox/java/cluster-sharding.md @@ -267,12 +267,15 @@ are thereafter delivered to a new incarnation of the entity. The list of entities in each `Shard` can be made persistent (durable) by setting the `rememberEntities` flag to true in `ClusterShardingSettings` when calling -`ClusterSharding.start`. When configured to remember entities, whenever a `Shard` -is rebalanced onto another node or recovers after a crash it will recreate all the -entities which were previously running in that `Shard`. To permanently stop entities, -a `Passivate` message must be sent to the parent of the entity actor, otherwise the -entity will be automatically restarted after the entity restart backoff specified in -the configuration. +`ClusterSharding.start` and making sure the `shardIdExtractor` handles +`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to +extract from the `EntityId`. + +When configured to remember entities, whenever a `Shard` is rebalanced onto another +node or recovers after a crash it will recreate all the entities which were previously +running in that `Shard`. To permanently stop entities, a `Passivate` message must be +sent to the parent of the entity actor, otherwise the entity will be automatically +restarted after the entity restart backoff specified in the configuration. When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the diff --git a/akka-docs/src/main/paradox/scala/cluster-sharding.md b/akka-docs/src/main/paradox/scala/cluster-sharding.md index 728b0dd4fb..361d66530c 100644 --- a/akka-docs/src/main/paradox/scala/cluster-sharding.md +++ b/akka-docs/src/main/paradox/scala/cluster-sharding.md @@ -270,12 +270,15 @@ are thereafter delivered to a new incarnation of the entity. The list of entities in each `Shard` can be made persistent (durable) by setting the `rememberEntities` flag to true in `ClusterShardingSettings` when calling -`ClusterSharding.start`. When configured to remember entities, whenever a `Shard` -is rebalanced onto another node or recovers after a crash it will recreate all the -entities which were previously running in that `Shard`. To permanently stop entities, -a `Passivate` message must be sent to the parent of the entity actor, otherwise the -entity will be automatically restarted after the entity restart backoff specified in -the configuration. +`ClusterSharding.start` and making sure the `shardIdExtractor` handles +`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to +extract from the `EntityId`. + +When configured to remember entities, whenever a `Shard` is rebalanced onto another +node or recovers after a crash it will recreate all the entities which were previously +running in that `Shard`. To permanently stop entities, a `Passivate` message must be +sent to the parent of the entity actor, otherwise the entity will be automatically +restarted after the entity restart backoff specified in the configuration. When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the diff --git a/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md b/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md index 20a94abf98..37e34b7c65 100644 --- a/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md +++ b/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md @@ -479,6 +479,18 @@ Note that the stored @ref:[Remembering Entities](../cluster-sharding.md#cluster- be migrated to the `data` mode. Such entities must be started again in some other way when using `ddata` mode. +### Cluster Sharding remember entities + +To use *remember entities* with cluster sharding there are now an additional requirement added: the +`extractShardId` must be able to extract the shard id from the message `Shard.StartEntity(EntityId)`. +This is implies that it must be possible to calculate a shard id from an entity id when using remember +entities. + +This was added to be able to gracefully handle when persisted locations of entities does not match +where the entities should live when a shard region starts up. Such states could be cause by changing +the `extractShardId` logic and restart a system using *remember entities*. + + ### Cluster Management Command Line Tool There is a new cluster management tool with HTTP API that has the same functionality as the command line tool.