diff --git a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
index 18170e75d7..b68ffc731a 100644
--- a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
+++ b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
@@ -6159,6 +6159,1128 @@ public final class ClusterShardingMessages {
// @@protoc_insertion_point(class_scope:ShardStats)
}
+ public interface StartEntityOrBuilder
+ extends akka.protobuf.MessageOrBuilder {
+
+ // required string entityId = 1;
+ /**
+ * required string entityId = 1;
+ */
+ boolean hasEntityId();
+ /**
+ * required string entityId = 1;
+ */
+ java.lang.String getEntityId();
+ /**
+ * required string entityId = 1;
+ */
+ akka.protobuf.ByteString
+ getEntityIdBytes();
+ }
+ /**
+ * Protobuf type {@code StartEntity}
+ */
+ public static final class StartEntity extends
+ akka.protobuf.GeneratedMessage
+ implements StartEntityOrBuilder {
+ // Use StartEntity.newBuilder() to construct.
+ private StartEntity(akka.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private StartEntity(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final StartEntity defaultInstance;
+ public static StartEntity getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public StartEntity getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final akka.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final akka.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private StartEntity(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ akka.protobuf.UnknownFieldSet.Builder unknownFields =
+ akka.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ entityId_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new akka.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.Builder.class);
+ }
+
+ public static akka.protobuf.Parser PARSER =
+ new akka.protobuf.AbstractParser() {
+ public StartEntity parsePartialFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return new StartEntity(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public akka.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required string entityId = 1;
+ public static final int ENTITYID_FIELD_NUMBER = 1;
+ private java.lang.Object entityId_;
+ /**
+ * required string entityId = 1;
+ */
+ public boolean hasEntityId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public java.lang.String getEntityId() {
+ java.lang.Object ref = entityId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ akka.protobuf.ByteString bs =
+ (akka.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ entityId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public akka.protobuf.ByteString
+ getEntityIdBytes() {
+ java.lang.Object ref = entityId_;
+ if (ref instanceof java.lang.String) {
+ akka.protobuf.ByteString b =
+ akka.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ entityId_ = b;
+ return b;
+ } else {
+ return (akka.protobuf.ByteString) ref;
+ }
+ }
+
+ private void initFields() {
+ entityId_ = "";
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasEntityId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(akka.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getEntityIdBytes());
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeBytesSize(1, getEntityIdBytes());
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(
+ akka.protobuf.ByteString data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(
+ akka.protobuf.ByteString data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(byte[] data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(
+ byte[] data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseDelimitedFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(
+ akka.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parseFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code StartEntity}
+ */
+ public static final class Builder extends
+ akka.protobuf.GeneratedMessage.Builder
+ implements akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityOrBuilder {
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.Builder.class);
+ }
+
+ // Construct using akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ entityId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public akka.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntity_descriptor;
+ }
+
+ public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity getDefaultInstanceForType() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.getDefaultInstance();
+ }
+
+ public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity build() {
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity buildPartial() {
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity result = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.entityId_ = entityId_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(akka.protobuf.Message other) {
+ if (other instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity) {
+ return mergeFrom((akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity other) {
+ if (other == akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity.getDefaultInstance()) return this;
+ if (other.hasEntityId()) {
+ bitField0_ |= 0x00000001;
+ entityId_ = other.entityId_;
+ onChanged();
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasEntityId()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntity) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required string entityId = 1;
+ private java.lang.Object entityId_ = "";
+ /**
+ * required string entityId = 1;
+ */
+ public boolean hasEntityId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public java.lang.String getEntityId() {
+ java.lang.Object ref = entityId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((akka.protobuf.ByteString) ref)
+ .toStringUtf8();
+ entityId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public akka.protobuf.ByteString
+ getEntityIdBytes() {
+ java.lang.Object ref = entityId_;
+ if (ref instanceof String) {
+ akka.protobuf.ByteString b =
+ akka.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ entityId_ = b;
+ return b;
+ } else {
+ return (akka.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public Builder setEntityId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ entityId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public Builder clearEntityId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ entityId_ = getDefaultInstance().getEntityId();
+ onChanged();
+ return this;
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public Builder setEntityIdBytes(
+ akka.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ entityId_ = value;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:StartEntity)
+ }
+
+ static {
+ defaultInstance = new StartEntity(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:StartEntity)
+ }
+
+ public interface StartEntityAckOrBuilder
+ extends akka.protobuf.MessageOrBuilder {
+
+ // required string entityId = 1;
+ /**
+ * required string entityId = 1;
+ */
+ boolean hasEntityId();
+ /**
+ * required string entityId = 1;
+ */
+ java.lang.String getEntityId();
+ /**
+ * required string entityId = 1;
+ */
+ akka.protobuf.ByteString
+ getEntityIdBytes();
+
+ // required string shardId = 2;
+ /**
+ * required string shardId = 2;
+ */
+ boolean hasShardId();
+ /**
+ * required string shardId = 2;
+ */
+ java.lang.String getShardId();
+ /**
+ * required string shardId = 2;
+ */
+ akka.protobuf.ByteString
+ getShardIdBytes();
+ }
+ /**
+ * Protobuf type {@code StartEntityAck}
+ */
+ public static final class StartEntityAck extends
+ akka.protobuf.GeneratedMessage
+ implements StartEntityAckOrBuilder {
+ // Use StartEntityAck.newBuilder() to construct.
+ private StartEntityAck(akka.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private StartEntityAck(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final StartEntityAck defaultInstance;
+ public static StartEntityAck getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public StartEntityAck getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final akka.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final akka.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private StartEntityAck(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ akka.protobuf.UnknownFieldSet.Builder unknownFields =
+ akka.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ bitField0_ |= 0x00000001;
+ entityId_ = input.readBytes();
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000002;
+ shardId_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new akka.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.Builder.class);
+ }
+
+ public static akka.protobuf.Parser PARSER =
+ new akka.protobuf.AbstractParser() {
+ public StartEntityAck parsePartialFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return new StartEntityAck(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public akka.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required string entityId = 1;
+ public static final int ENTITYID_FIELD_NUMBER = 1;
+ private java.lang.Object entityId_;
+ /**
+ * required string entityId = 1;
+ */
+ public boolean hasEntityId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public java.lang.String getEntityId() {
+ java.lang.Object ref = entityId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ akka.protobuf.ByteString bs =
+ (akka.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ entityId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public akka.protobuf.ByteString
+ getEntityIdBytes() {
+ java.lang.Object ref = entityId_;
+ if (ref instanceof java.lang.String) {
+ akka.protobuf.ByteString b =
+ akka.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ entityId_ = b;
+ return b;
+ } else {
+ return (akka.protobuf.ByteString) ref;
+ }
+ }
+
+ // required string shardId = 2;
+ public static final int SHARDID_FIELD_NUMBER = 2;
+ private java.lang.Object shardId_;
+ /**
+ * required string shardId = 2;
+ */
+ public boolean hasShardId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required string shardId = 2;
+ */
+ public java.lang.String getShardId() {
+ java.lang.Object ref = shardId_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ akka.protobuf.ByteString bs =
+ (akka.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ shardId_ = s;
+ }
+ return s;
+ }
+ }
+ /**
+ * required string shardId = 2;
+ */
+ public akka.protobuf.ByteString
+ getShardIdBytes() {
+ java.lang.Object ref = shardId_;
+ if (ref instanceof java.lang.String) {
+ akka.protobuf.ByteString b =
+ akka.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ shardId_ = b;
+ return b;
+ } else {
+ return (akka.protobuf.ByteString) ref;
+ }
+ }
+
+ private void initFields() {
+ entityId_ = "";
+ shardId_ = "";
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasEntityId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasShardId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(akka.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(1, getEntityIdBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBytes(2, getShardIdBytes());
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeBytesSize(1, getEntityIdBytes());
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeBytesSize(2, getShardIdBytes());
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(
+ akka.protobuf.ByteString data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(
+ akka.protobuf.ByteString data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(byte[] data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(
+ byte[] data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseDelimitedFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(
+ akka.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parseFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code StartEntityAck}
+ */
+ public static final class Builder extends
+ akka.protobuf.GeneratedMessage.Builder
+ implements akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAckOrBuilder {
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.Builder.class);
+ }
+
+ // Construct using akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ entityId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000001);
+ shardId_ = "";
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public akka.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_StartEntityAck_descriptor;
+ }
+
+ public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck getDefaultInstanceForType() {
+ return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.getDefaultInstance();
+ }
+
+ public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck build() {
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck buildPartial() {
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck result = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.entityId_ = entityId_;
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.shardId_ = shardId_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(akka.protobuf.Message other) {
+ if (other instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck) {
+ return mergeFrom((akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck other) {
+ if (other == akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck.getDefaultInstance()) return this;
+ if (other.hasEntityId()) {
+ bitField0_ |= 0x00000001;
+ entityId_ = other.entityId_;
+ onChanged();
+ }
+ if (other.hasShardId()) {
+ bitField0_ |= 0x00000002;
+ shardId_ = other.shardId_;
+ onChanged();
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasEntityId()) {
+
+ return false;
+ }
+ if (!hasShardId()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.StartEntityAck) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required string entityId = 1;
+ private java.lang.Object entityId_ = "";
+ /**
+ * required string entityId = 1;
+ */
+ public boolean hasEntityId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public java.lang.String getEntityId() {
+ java.lang.Object ref = entityId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((akka.protobuf.ByteString) ref)
+ .toStringUtf8();
+ entityId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public akka.protobuf.ByteString
+ getEntityIdBytes() {
+ java.lang.Object ref = entityId_;
+ if (ref instanceof String) {
+ akka.protobuf.ByteString b =
+ akka.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ entityId_ = b;
+ return b;
+ } else {
+ return (akka.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public Builder setEntityId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ entityId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public Builder clearEntityId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ entityId_ = getDefaultInstance().getEntityId();
+ onChanged();
+ return this;
+ }
+ /**
+ * required string entityId = 1;
+ */
+ public Builder setEntityIdBytes(
+ akka.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ entityId_ = value;
+ onChanged();
+ return this;
+ }
+
+ // required string shardId = 2;
+ private java.lang.Object shardId_ = "";
+ /**
+ * required string shardId = 2;
+ */
+ public boolean hasShardId() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required string shardId = 2;
+ */
+ public java.lang.String getShardId() {
+ java.lang.Object ref = shardId_;
+ if (!(ref instanceof java.lang.String)) {
+ java.lang.String s = ((akka.protobuf.ByteString) ref)
+ .toStringUtf8();
+ shardId_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * required string shardId = 2;
+ */
+ public akka.protobuf.ByteString
+ getShardIdBytes() {
+ java.lang.Object ref = shardId_;
+ if (ref instanceof String) {
+ akka.protobuf.ByteString b =
+ akka.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ shardId_ = b;
+ return b;
+ } else {
+ return (akka.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * required string shardId = 2;
+ */
+ public Builder setShardId(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ shardId_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required string shardId = 2;
+ */
+ public Builder clearShardId() {
+ bitField0_ = (bitField0_ & ~0x00000002);
+ shardId_ = getDefaultInstance().getShardId();
+ onChanged();
+ return this;
+ }
+ /**
+ * required string shardId = 2;
+ */
+ public Builder setShardIdBytes(
+ akka.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000002;
+ shardId_ = value;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:StartEntityAck)
+ }
+
+ static {
+ defaultInstance = new StartEntityAck(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:StartEntityAck)
+ }
+
private static akka.protobuf.Descriptors.Descriptor
internal_static_CoordinatorState_descriptor;
private static
@@ -6209,6 +7331,16 @@ public final class ClusterShardingMessages {
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_ShardStats_fieldAccessorTable;
+ private static akka.protobuf.Descriptors.Descriptor
+ internal_static_StartEntity_descriptor;
+ private static
+ akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_StartEntity_fieldAccessorTable;
+ private static akka.protobuf.Descriptors.Descriptor
+ internal_static_StartEntityAck_descriptor;
+ private static
+ akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_StartEntityAck_fieldAccessorTable;
public static akka.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -6231,8 +7363,10 @@ public final class ClusterShardingMessages {
"ties\030\001 \003(\t\"!\n\rEntityStarted\022\020\n\010entityId\030" +
"\001 \002(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t" +
"\"0\n\nShardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityC" +
- "ount\030\002 \002(\005B&\n\"akka.cluster.sharding.prot" +
- "obuf.msgH\001"
+ "ount\030\002 \002(\005\"\037\n\013StartEntity\022\020\n\010entityId\030\001 " +
+ "\002(\t\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022" +
+ "\017\n\007shardId\030\002 \002(\tB&\n\"akka.cluster.shardin" +
+ "g.protobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6299,6 +7433,18 @@ public final class ClusterShardingMessages {
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ShardStats_descriptor,
new java.lang.String[] { "Shard", "EntityCount", });
+ internal_static_StartEntity_descriptor =
+ getDescriptor().getMessageTypes().get(9);
+ internal_static_StartEntity_fieldAccessorTable = new
+ akka.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_StartEntity_descriptor,
+ new java.lang.String[] { "EntityId", });
+ internal_static_StartEntityAck_descriptor =
+ getDescriptor().getMessageTypes().get(10);
+ internal_static_StartEntityAck_fieldAccessorTable = new
+ akka.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_StartEntityAck_descriptor,
+ new java.lang.String[] { "EntityId", "ShardId", });
return null;
}
};
diff --git a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
index a5fab0a382..1493e58662 100644
--- a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
+++ b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
@@ -52,3 +52,12 @@ message ShardStats {
required string shard = 1;
required int32 entityCount = 2;
}
+
+message StartEntity {
+ required string entityId = 1;
+}
+
+message StartEntityAck {
+ required string entityId = 1;
+ required string shardId = 2;
+}
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
index 8c27304903..41af79ca98 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
@@ -47,7 +47,7 @@ private[akka] object Shard {
sealed trait ShardCommand
/**
- * When an remembering entities and the entity stops without issuing a `Passivate`, we
+ * When remembering entities and the entity stops without issuing a `Passivate`, we
* restart it after a back off using this message.
*/
final case class RestartEntity(entity: EntityId) extends ShardCommand
@@ -170,6 +170,8 @@ private[akka] class Shard(
case Terminated(ref) ⇒ receiveTerminated(ref)
case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg)
case msg: ShardCommand ⇒ receiveShardCommand(msg)
+ case msg: ShardRegion.StartEntity ⇒ receiveStartEntity(msg)
+ case msg: ShardRegion.StartEntityAck ⇒ receiveStartEntityAck(msg)
case msg: ShardRegionCommand ⇒ receiveShardRegionCommand(msg)
case msg: ShardQuery ⇒ receiveShardQuery(msg)
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender())
@@ -177,7 +179,27 @@ private[akka] class Shard(
def receiveShardCommand(msg: ShardCommand): Unit = msg match {
case RestartEntity(id) ⇒ getEntity(id)
- case RestartEntities(ids) ⇒ ids foreach getEntity
+ case RestartEntities(ids) ⇒ restartEntities(ids)
+ }
+
+ def receiveStartEntity(start: ShardRegion.StartEntity): Unit = {
+ log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", sender(), start.entityId, shardId)
+ getEntity(start.entityId)
+ sender() ! ShardRegion.StartEntityAck(start.entityId, shardId)
+ }
+
+ def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = {
+ if (ack.shardId != shardId && state.entities.contains(ack.entityId)) {
+ log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId)
+ processChange(EntityStopped(ack.entityId)) { _ ⇒
+ state = state.copy(state.entities - ack.entityId)
+ messageBuffers.remove(ack.entityId)
+ }
+ }
+ }
+
+ def restartEntities(ids: Set[EntityId]): Unit = {
+ context.actorOf(RememberEntityStarter.props(typeName, shardId, ids, settings, sender()))
}
def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match {
@@ -318,6 +340,63 @@ private[akka] class Shard(
}
}
+private[akka] object RememberEntityStarter {
+ def props(
+ typeName: String,
+ shardId: ShardRegion.ShardId,
+ ids: Set[ShardRegion.EntityId],
+ settings: ClusterShardingSettings,
+ requestor: ActorRef) =
+ Props(new RememberEntityStarter(typeName, shardId, ids, settings, requestor))
+}
+
+/**
+ * INTERNAL API: Actor responsible for starting entities when rememberEntities is enabled
+ */
+private[akka] class RememberEntityStarter(
+ typeName: String,
+ shardId: ShardRegion.ShardId,
+ ids: Set[ShardRegion.EntityId],
+ settings: ClusterShardingSettings,
+ requestor: ActorRef
+) extends Actor {
+
+ import context.dispatcher
+ import scala.concurrent.duration._
+
+ case object Tick
+
+ val region = ClusterSharding(context.system).shardRegion(typeName)
+ var waitingForAck = ids
+
+ sendStart(ids)
+
+ val tickTask = {
+ val resendInterval = settings.tuningParameters.retryInterval
+ context.system.scheduler.schedule(resendInterval, resendInterval, self, Tick)
+ }
+
+ def sendStart(ids: Set[ShardRegion.EntityId]): Unit = {
+ ids.foreach(id ⇒ region ! ShardRegion.StartEntity(id))
+ }
+
+ override def receive = {
+ case ack: ShardRegion.StartEntityAck ⇒
+ waitingForAck -= ack.entityId
+ // inform whoever requested the start that it happened
+ requestor ! ack
+ if (waitingForAck.isEmpty) context.stop(self)
+
+ case Tick ⇒
+ sendStart(waitingForAck)
+
+ }
+
+ override def postStop(): Unit = {
+ tickTask.cancel()
+ }
+}
+
/**
* INTERNAL API: Common things for PersistentShard and DDataShard
*/
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
index 1d80818714..27b807b8f8 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
@@ -4,6 +4,7 @@
package akka.cluster.sharding
import java.net.URLEncoder
+
import akka.pattern.AskTimeoutException
import akka.util.{ MessageBufferMap, Timeout }
import akka.pattern.{ ask, pipe }
@@ -12,6 +13,7 @@ import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
+
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Future
@@ -300,6 +302,19 @@ object ShardRegion {
*/
private final case class RestartShard(shardId: ShardId)
+ /**
+ * When remembering entities and a shard is started, each entity id that needs to
+ * be running will trigger this message being sent through sharding. For this to work
+ * the message *must* be handled by the shard id extractor.
+ */
+ final case class StartEntity(entityId: EntityId) extends ClusterShardingSerializable
+
+ /**
+ * Sent back when a `ShardRegion.StartEntity` message was received and triggered the entity
+ * to start (it does not guarantee the entity successfully started)
+ */
+ final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId) extends ClusterShardingSerializable
+
private def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
@@ -438,6 +453,7 @@ private[akka] class ShardRegion(
case cmd: ShardRegionCommand ⇒ receiveCommand(cmd)
case query: ShardRegionQuery ⇒ receiveQuery(query)
case msg: RestartShard ⇒ deliverMessage(msg, sender())
+ case msg: StartEntity ⇒ deliverStartEntity(msg, sender())
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender())
case unknownMsg ⇒ log.warning("Message does not have an extractor defined in shard [{}] so it was ignored: {}", typeName, unknownMsg)
}
@@ -699,6 +715,15 @@ private[akka] class ShardRegion(
retryCount = 0
}
+ def deliverStartEntity(msg: StartEntity, snd: ActorRef): Unit = {
+ try {
+ deliverMessage(msg, snd)
+ } catch {
+ case ex: MatchError ⇒
+ log.error(ex, "When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).")
+ }
+ }
+
def deliverMessage(msg: Any, snd: ActorRef): Unit =
msg match {
case RestartShard(shardId) ⇒
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
index 806075cc28..03442a42c0 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
@@ -10,7 +10,6 @@ import java.util.zip.GZIPOutputStream
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.breakOut
-
import akka.actor.ActorRef
import akka.actor.ExtendedActorSystem
import akka.cluster.sharding.Shard
@@ -18,11 +17,12 @@ import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages ⇒ sm }
import akka.serialization.BaseSerializer
import akka.serialization.Serialization
-import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
import akka.protobuf.MessageLite
import java.io.NotSerializableException
+import akka.cluster.sharding.ShardRegion.{ StartEntity, StartEntityAck }
+
/**
* INTERNAL API: Protobuf serializer of ClusterSharding messages.
*/
@@ -59,6 +59,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
private val EntityStartedManifest = "CB"
private val EntityStoppedManifest = "CD"
+ private val StartEntityManifest = "EA"
+ private val StartEntityAckManifest = "EB"
+
private val GetShardStatsManifest = "DA"
private val ShardStatsManifest = "DB"
@@ -89,7 +92,11 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
GracefulShutdownReqManifest → { bytes ⇒ GracefulShutdownReq(actorRefMessageFromBinary(bytes)) },
GetShardStatsManifest → { bytes ⇒ GetShardStats },
- ShardStatsManifest → { bytes ⇒ shardStatsFromBinary(bytes) })
+ ShardStatsManifest → { bytes ⇒ shardStatsFromBinary(bytes) },
+
+ StartEntityManifest → { startEntityFromBinary(_) },
+ StartEntityAckManifest → { startEntityAckFromBinary(_) }
+ )
override def manifest(obj: AnyRef): String = obj match {
case _: EntityState ⇒ EntityStateManifest
@@ -117,6 +124,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
case _: ShardStopped ⇒ ShardStoppedManifest
case _: GracefulShutdownReq ⇒ GracefulShutdownReqManifest
+ case _: StartEntity ⇒ StartEntityManifest
+ case _: StartEntityAck ⇒ StartEntityAckManifest
+
case GetShardStats ⇒ GetShardStatsManifest
case _: ShardStats ⇒ ShardStatsManifest
case _ ⇒
@@ -146,12 +156,15 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
case GracefulShutdownReq(ref) ⇒
actorRefMessageToProto(ref).toByteArray
- case m: EntityState ⇒ entityStateToProto(m).toByteArray
- case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray
- case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray
+ case m: EntityState ⇒ entityStateToProto(m).toByteArray
+ case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray
+ case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray
- case GetShardStats ⇒ Array.emptyByteArray
- case m: ShardStats ⇒ shardStatsToProto(m).toByteArray
+ case s: StartEntity ⇒ startEntityToByteArray(s)
+ case s: StartEntityAck ⇒ startEntityAckToByteArray(s)
+
+ case GetShardStats ⇒ Array.emptyByteArray
+ case m: ShardStats ⇒ shardStatsToProto(m).toByteArray
case _ ⇒
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
@@ -266,6 +279,29 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
ShardStats(parsed.getShard, parsed.getEntityCount)
}
+ private def startEntityToByteArray(s: StartEntity): Array[Byte] = {
+ val builder = sm.StartEntity.newBuilder()
+ builder.setEntityId(s.entityId)
+ builder.build().toByteArray
+ }
+
+ private def startEntityFromBinary(bytes: Array[Byte]): StartEntity = {
+ val se = sm.StartEntity.parseFrom(bytes)
+ StartEntity(se.getEntityId)
+ }
+
+ private def startEntityAckToByteArray(s: StartEntityAck): Array[Byte] = {
+ val builder = sm.StartEntityAck.newBuilder()
+ builder.setEntityId(s.entityId)
+ builder.setShardId(s.shardId)
+ builder.build().toByteArray
+ }
+
+ private def startEntityAckFromBinary(bytes: Array[Byte]): StartEntityAck = {
+ val sea = sm.StartEntityAck.parseFrom(bytes)
+ StartEntityAck(sea.getEntityId, sea.getShardId)
+ }
+
private def resolveActorRef(path: String): ActorRef = {
system.provider.resolveActorRef(path)
}
diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala
new file mode 100644
index 0000000000..01a43a1616
--- /dev/null
+++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala
@@ -0,0 +1,296 @@
+/**
+ * Copyright (C) 2009-2017 Lightbend Inc.
+ */
+package akka.cluster.sharding
+
+import java.io.File
+
+import akka.actor._
+import akka.cluster.{ Cluster, MemberStatus }
+import akka.persistence.Persistence
+import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+import org.apache.commons.io.FileUtils
+
+import scala.concurrent.duration._
+
+object ClusterShardingRememberEntitiesNewExtractorSpec {
+
+ final case class Started(ref: ActorRef)
+
+ def props(probe: Option[ActorRef]): Props = Props(new TestEntity(probe))
+
+ class TestEntity(probe: Option[ActorRef]) extends Actor with ActorLogging {
+ log.info("Entity started: " + self.path)
+ probe.foreach(_ ! Started(self))
+
+ def receive = {
+ case m ⇒ sender() ! m
+ }
+ }
+
+ val shardCount = 5
+
+ val extractEntityId: ShardRegion.ExtractEntityId = {
+ case id: Int ⇒ (id.toString, id)
+ }
+
+ val extractShardId1: ShardRegion.ExtractShardId = {
+ case id: Int ⇒ (id % shardCount).toString
+ case ShardRegion.StartEntity(id) ⇒ extractShardId1(id.toInt)
+ }
+
+ val extractShardId2: ShardRegion.ExtractShardId = {
+ // always bump it one shard id
+ case id: Int ⇒ ((id + 1) % shardCount).toString
+ case ShardRegion.StartEntity(id) ⇒ extractShardId2(id.toInt)
+ }
+
+}
+
+abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: String) extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+ val fourth = role("fourth")
+ val fifth = role("fifth")
+
+ commonConfig(ConfigFactory.parseString(s"""
+ akka.loglevel = DEBUG
+ akka.actor.provider = "cluster"
+ akka.cluster.auto-down-unreachable-after = 0s
+ akka.remote.log-remote-lifecycle-events = off
+ akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
+ akka.persistence.journal.leveldb-shared {
+ timeout = 5s
+ store {
+ native = off
+ dir = "target/ShardingRememberEntitiesNewExtractorSpec/journal"
+ }
+ }
+ akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
+ akka.persistence.snapshot-store.local.dir = "target/ShardingRememberEntitiesNewExtractorSpec/snapshots"
+ akka.cluster.sharding.state-store-mode = "$mode"
+ akka.cluster.sharding.distributed-data.durable.lmdb {
+ dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-ddata
+ map-size = 10 MiB
+ }
+ """))
+
+ val roleConfig = ConfigFactory.parseString(
+ """
+ akka.cluster.roles = [sharding]
+ """)
+
+ // we pretend node 4 and 5 are new incarnations of node 2 and 3 as they never run in parallel
+ // so we can use the same lmdb store for them and have node 4 pick up the persisted data of node 2
+ val ddataNodeAConfig = ConfigFactory.parseString(
+ """
+ akka.cluster.sharding.distributed-data.durable.lmdb {
+ dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-node-a
+ }
+ """)
+ val ddataNodeBConfig = ConfigFactory.parseString(
+ """
+ akka.cluster.sharding.distributed-data.durable.lmdb {
+ dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-node-b
+ }
+ """)
+
+ nodeConfig(second)(roleConfig.withFallback(ddataNodeAConfig))
+ nodeConfig(third)(roleConfig.withFallback(ddataNodeBConfig))
+ nodeConfig(fourth)(roleConfig.withFallback(ddataNodeAConfig))
+ nodeConfig(fifth)(roleConfig.withFallback(ddataNodeBConfig))
+
+}
+
+object PersistentClusterShardingRememberEntitiesSpecNewExtractorConfig extends ClusterShardingRememberEntitiesNewExtractorSpecConfig(
+ ClusterShardingSettings.StateStoreModePersistence)
+object DDataClusterShardingRememberEntitiesNewExtractorSpecConfig extends ClusterShardingRememberEntitiesNewExtractorSpecConfig(
+ ClusterShardingSettings.StateStoreModeDData)
+
+class PersistentClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardingRememberEntitiesNewExtractorSpec(
+ PersistentClusterShardingRememberEntitiesSpecNewExtractorConfig)
+
+class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
+class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
+class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
+class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
+class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
+
+class DDataClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardingRememberEntitiesNewExtractorSpec(
+ DDataClusterShardingRememberEntitiesNewExtractorSpecConfig)
+
+class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
+class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
+class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
+class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
+class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
+
+abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterShardingRememberEntitiesNewExtractorSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
+ import ClusterShardingRememberEntitiesNewExtractorSpec._
+ import config._
+
+ val typeName = "Entity"
+
+ override def initialParticipants = roles.size
+
+ val storageLocations = List(new File(system.settings.config.getString(
+ "akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
+
+ override protected def atStartup() {
+ storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir))
+ enterBarrier("startup")
+ }
+
+ override protected def afterTermination() {
+ storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir))
+ }
+
+ def join(from: RoleName, to: RoleName): Unit = {
+ runOn(from) {
+ Cluster(system) join node(to).address
+ }
+ enterBarrier(from.name + "-joined")
+ }
+
+ val cluster = Cluster(system)
+
+ def startShardingWithExtractor1(): Unit = {
+ ClusterSharding(system).start(
+ typeName = typeName,
+ entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(None),
+ settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"),
+ extractEntityId = extractEntityId,
+ extractShardId = extractShardId1)
+ }
+
+ def startShardingWithExtractor2(): Unit = {
+ ClusterSharding(system).start(
+ typeName = typeName,
+ entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(testActor)),
+ settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"),
+ extractEntityId = extractEntityId,
+ extractShardId = extractShardId2)
+ }
+
+ lazy val region = ClusterSharding(system).shardRegion(typeName)
+
+ def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
+
+ s"Cluster with min-nr-of-members using sharding ($mode)" must {
+
+ if (!isDdataMode) {
+ "setup shared journal" in {
+ // start the Persistence extension
+ Persistence(system)
+ runOn(first) {
+ system.actorOf(Props[SharedLeveldbStore], "store")
+ }
+ enterBarrier("persistence-started")
+
+ runOn(second, third, fourth, fifth) {
+ system.actorSelection(node(first) / "user" / "store") ! Identify(None)
+ val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
+ SharedLeveldbJournal.setStore(sharedStore, system)
+ }
+
+ enterBarrier("after-1")
+ }
+ }
+
+ "start up first cluster and sharding" in within(15.seconds) {
+ join(first, first)
+ join(second, first)
+ join(third, first)
+
+ runOn(first, second, third) {
+ within(remaining) {
+ awaitAssert {
+ cluster.state.members.count(_.status == MemberStatus.Up) should ===(3)
+ }
+ }
+ }
+ runOn(second, third) {
+ startShardingWithExtractor1()
+ }
+ enterBarrier("first-cluster-up")
+
+ runOn(second, third) {
+ // one entity for each shard id
+ (1 to 10).foreach { n ⇒
+ region ! n
+ expectMsg(n)
+ }
+ }
+ enterBarrier("first-cluster-entities-up")
+ }
+
+ "shutdown sharding nodes" in within(30.seconds) {
+ runOn(first) {
+ testConductor.exit(second, 0).await
+ testConductor.exit(third, 0).await
+ }
+ runOn(first) {
+ within(remaining) {
+ awaitAssert {
+ cluster.state.members.count(_.status == MemberStatus.Up) should ===(1)
+ }
+ }
+ }
+ enterBarrier("first-sharding-cluster-stopped")
+ }
+
+ "start new nodes with different extractor" in within(15.seconds) {
+
+ // start it with a new shard id extractor, which will put the entities
+ // on different shards
+
+ join(fourth, first)
+ join(fifth, first)
+ runOn(first) {
+ within(remaining) {
+ awaitAssert {
+ cluster.state.members.count(_.status == MemberStatus.Up) should ===(3)
+ }
+ }
+ }
+ runOn(fourth, fifth) {
+ startShardingWithExtractor2()
+ }
+
+ // TODO how do we know that the shards has started??
+ Thread.sleep(7000)
+ enterBarrier("new-nodes-started")
+ }
+
+ "have the remembered entities running on the right shards" in within(15.seconds) {
+ runOn(fourth, fifth) {
+ var stats: ShardRegion.CurrentShardRegionState = null
+ within(remaining) {
+ awaitAssert {
+ region ! ShardRegion.GetShardRegionState
+ val reply = expectMsgType[ShardRegion.CurrentShardRegionState]
+ reply.shards should not be empty
+ stats = reply
+ }
+ }
+
+ for {
+ shardState ← stats.shards
+ entityId ← shardState.entityIds
+ } {
+ val calculatedShardId = extractShardId2(entityId.toInt)
+ calculatedShardId should ===(shardState.shardId)
+ }
+ }
+
+ enterBarrier("done")
+ }
+
+ }
+}
+
diff --git a/akka-cluster-sharding/src/test/resources/reference.conf b/akka-cluster-sharding/src/test/resources/reference.conf
index eebfbdd0a2..589f6650ea 100644
--- a/akka-cluster-sharding/src/test/resources/reference.conf
+++ b/akka-cluster-sharding/src/test/resources/reference.conf
@@ -1,6 +1,6 @@
akka {
actor {
- serialize-creators = on
+ serialize-creators = off
serialize-messages = on
warn-about-java-serializer-usage = off
}
diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
index d37c20ba31..80831f0036 100644
--- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
+++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
@@ -3,11 +3,10 @@
*/
package akka.cluster.sharding.protobuf
-import akka.actor.{ ExtendedActorSystem }
+import akka.actor.ExtendedActorSystem
import akka.testkit.AkkaSpec
import akka.actor.Props
-import akka.cluster.sharding.ShardCoordinator
-import akka.cluster.sharding.Shard
+import akka.cluster.sharding.{ Shard, ShardCoordinator, ShardRegion }
class ClusterShardingMessageSerializerSpec extends AkkaSpec {
import ShardCoordinator.Internal._
@@ -28,7 +27,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
"ClusterShardingMessageSerializer" must {
- "be able to serializable ShardCoordinator snapshot State" in {
+ "be able to serialize ShardCoordinator snapshot State" in {
val state = State(
shards = Map("a" → region1, "b" → region2, "c" → region2),
regions = Map(region1 → Vector("a"), region2 → Vector("b", "c"), region3 → Vector.empty[String]),
@@ -37,7 +36,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
checkSerialization(state)
}
- "be able to serializable ShardCoordinator domain events" in {
+ "be able to serialize ShardCoordinator domain events" in {
checkSerialization(ShardRegionRegistered(region1))
checkSerialization(ShardRegionProxyRegistered(regionProxy1))
checkSerialization(ShardRegionTerminated(region1))
@@ -46,7 +45,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
checkSerialization(ShardHomeDeallocated("a"))
}
- "be able to serializable ShardCoordinator remote messages" in {
+ "be able to serialize ShardCoordinator remote messages" in {
checkSerialization(Register(region1))
checkSerialization(RegisterProxy(regionProxy1))
checkSerialization(RegisterAck(region1))
@@ -61,21 +60,26 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
checkSerialization(GracefulShutdownReq(region1))
}
- "be able to serializable PersistentShard snapshot state" in {
+ "be able to serialize PersistentShard snapshot state" in {
checkSerialization(Shard.State(Set("e1", "e2", "e3")))
}
- "be able to serializable PersistentShard domain events" in {
+ "be able to serialize PersistentShard domain events" in {
checkSerialization(Shard.EntityStarted("e1"))
checkSerialization(Shard.EntityStopped("e1"))
}
- "be able to serializable GetShardStats" in {
+ "be able to serialize GetShardStats" in {
checkSerialization(Shard.GetShardStats)
}
- "be able to serializable ShardStats" in {
+ "be able to serialize ShardStats" in {
checkSerialization(Shard.ShardStats("a", 23))
}
+
+ "be able to serialize StartEntity" in {
+ checkSerialization(ShardRegion.StartEntity("42"))
+ checkSerialization(ShardRegion.StartEntityAck("13", "37"))
+ }
}
}
diff --git a/akka-docs/src/main/paradox/java/cluster-sharding.md b/akka-docs/src/main/paradox/java/cluster-sharding.md
index d67de743f8..5bfde4acd8 100644
--- a/akka-docs/src/main/paradox/java/cluster-sharding.md
+++ b/akka-docs/src/main/paradox/java/cluster-sharding.md
@@ -267,12 +267,15 @@ are thereafter delivered to a new incarnation of the entity.
The list of entities in each `Shard` can be made persistent (durable) by setting
the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
-`ClusterSharding.start`. When configured to remember entities, whenever a `Shard`
-is rebalanced onto another node or recovers after a crash it will recreate all the
-entities which were previously running in that `Shard`. To permanently stop entities,
-a `Passivate` message must be sent to the parent of the entity actor, otherwise the
-entity will be automatically restarted after the entity restart backoff specified in
-the configuration.
+`ClusterSharding.start` and making sure the `shardIdExtractor` handles
+`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
+extract from the `EntityId`.
+
+When configured to remember entities, whenever a `Shard` is rebalanced onto another
+node or recovers after a crash it will recreate all the entities which were previously
+running in that `Shard`. To permanently stop entities, a `Passivate` message must be
+sent to the parent of the entity actor, otherwise the entity will be automatically
+restarted after the entity restart backoff specified in the configuration.
When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are
stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
diff --git a/akka-docs/src/main/paradox/scala/cluster-sharding.md b/akka-docs/src/main/paradox/scala/cluster-sharding.md
index 728b0dd4fb..361d66530c 100644
--- a/akka-docs/src/main/paradox/scala/cluster-sharding.md
+++ b/akka-docs/src/main/paradox/scala/cluster-sharding.md
@@ -270,12 +270,15 @@ are thereafter delivered to a new incarnation of the entity.
The list of entities in each `Shard` can be made persistent (durable) by setting
the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
-`ClusterSharding.start`. When configured to remember entities, whenever a `Shard`
-is rebalanced onto another node or recovers after a crash it will recreate all the
-entities which were previously running in that `Shard`. To permanently stop entities,
-a `Passivate` message must be sent to the parent of the entity actor, otherwise the
-entity will be automatically restarted after the entity restart backoff specified in
-the configuration.
+`ClusterSharding.start` and making sure the `shardIdExtractor` handles
+`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
+extract from the `EntityId`.
+
+When configured to remember entities, whenever a `Shard` is rebalanced onto another
+node or recovers after a crash it will recreate all the entities which were previously
+running in that `Shard`. To permanently stop entities, a `Passivate` message must be
+sent to the parent of the entity actor, otherwise the entity will be automatically
+restarted after the entity restart backoff specified in the configuration.
When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are
stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
diff --git a/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md b/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md
index 20a94abf98..37e34b7c65 100644
--- a/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md
+++ b/akka-docs/src/main/paradox/scala/project/migration-guide-2.4.x-2.5.x.md
@@ -479,6 +479,18 @@ Note that the stored @ref:[Remembering Entities](../cluster-sharding.md#cluster-
be migrated to the `data` mode. Such entities must be started again in some other way when using
`ddata` mode.
+### Cluster Sharding remember entities
+
+To use *remember entities* with cluster sharding there are now an additional requirement added: the
+`extractShardId` must be able to extract the shard id from the message `Shard.StartEntity(EntityId)`.
+This is implies that it must be possible to calculate a shard id from an entity id when using remember
+entities.
+
+This was added to be able to gracefully handle when persisted locations of entities does not match
+where the entities should live when a shard region starts up. Such states could be cause by changing
+the `extractShardId` logic and restart a system using *remember entities*.
+
+
### Cluster Management Command Line Tool
There is a new cluster management tool with HTTP API that has the same functionality as the command line tool.