From dba6eec46055e04f93db0d74e0463bfa234fc28d Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Wed, 13 May 2020 10:38:11 +0100 Subject: [PATCH] =?UTF-8?q?Migration=20from=20persistent=20shard=20coordin?= =?UTF-8?q?ator=20to=20ddata=20with=20eventsource=E2=80=A6=20(#29058)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Migration from persistent shard coordinator to ddata with eventsourced remembered entities * Fix bin compat in typed sharding * Add log capturing * Java API for nested case objects in typed sharding settings * Starting some docs for remembering entities store * Snapshot and marker to detect going back to persistence mode * Review feedback * Unused imports --- .../typed/ClusterShardingSettings.scala | 85 +- .../internal/ShardedDaemonProcessImpl.scala | 3 +- .../ClusterShardingSettingsCompileOnly.java | 19 + .../protobuf/msg/ClusterShardingMessages.java | 753 +++++++++++++++++- .../protobuf/ClusterShardingMessages.proto | 5 + .../src/main/resources/reference.conf | 3 +- .../cluster/sharding/ClusterSharding.scala | 12 +- .../sharding/ClusterShardingSettings.scala | 33 +- .../cluster/sharding/ShardCoordinator.scala | 15 +- .../EventSourcedRememberEntities.scala | 2 +- .../internal/EventSourcedRememberShards.scala | 113 ++- .../ClusterShardingMessageSerializer.scala | 32 +- .../PersistentShardingMigrationSpec.scala | 168 ++++ .../ExternalShardAllocationStrategySpec.scala | 1 + .../src/main/paradox/includes/cluster.md | 7 +- .../main/paradox/typed/cluster-sharding.md | 81 +- 16 files changed, 1281 insertions(+), 51 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingSettingsCompileOnly.java create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index a77daeda84..25216e59ea 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration.FiniteDuration import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.cluster.ClusterSettings.DataCenter +import akka.cluster.sharding.typed.ClusterShardingSettings.RememberEntitiesStoreModeDData import akka.cluster.sharding.{ ClusterShardingSettings => ClassicShardingSettings } import akka.cluster.singleton.{ ClusterSingletonManagerSettings => ClassicClusterSingletonManagerSettings } import akka.cluster.typed.Cluster @@ -45,6 +46,7 @@ object ClusterShardingSettings { passivateIdleEntityAfter = classicSettings.passivateIdleEntityAfter, shardRegionQueryTimeout = classicSettings.shardRegionQueryTimeout, stateStoreMode = StateStoreMode.byName(classicSettings.stateStoreMode), + rememberEntitiesStoreMode = RememberEntitiesStoreMode.byName(classicSettings.rememberEntitiesStore), new TuningParameters(classicSettings.tuningParameters), new ClusterSingletonManagerSettings( classicSettings.coordinatorSingletonSettings.singletonName, @@ -61,6 +63,7 @@ object ClusterShardingSettings { journalPluginId = settings.journalPluginId, snapshotPluginId = settings.snapshotPluginId, stateStoreMode = settings.stateStoreMode.name, + rememberEntitiesStore = settings.rememberEntitiesStoreMode.name, passivateIdleEntityAfter = settings.passivateIdleEntityAfter, shardRegionQueryTimeout = settings.shardRegionQueryTimeout, new ClassicShardingSettings.TuningParameters( @@ -97,16 +100,57 @@ object ClusterShardingSettings { if (role == "" || role == null) None else Option(role) sealed trait StateStoreMode { def name: String } + + /** + * Java API + */ + def stateStoreModePersistence(): StateStoreMode = StateStoreModePersistence + + /** + * Java API + */ + def stateStoreModeDdata(): StateStoreMode = StateStoreModePersistence + object StateStoreMode { + def byName(name: String): StateStoreMode = if (name == StateStoreModePersistence.name) StateStoreModePersistence else if (name == StateStoreModeDData.name) StateStoreModeDData else - throw new IllegalArgumentException("Not recognized StateStoreMode, only 'ddata' is supported.") + throw new IllegalArgumentException( + s"Not recognized StateStoreMode, only '${StateStoreModePersistence.name}' and '${StateStoreModeDData.name}' are supported.") } + final case object StateStoreModePersistence extends StateStoreMode { override def name = "persistence" } + final case object StateStoreModeDData extends StateStoreMode { override def name = "ddata" } + /** + * Java API + */ + def rememberEntitiesStoreModeEventSourced(): RememberEntitiesStoreMode = RememberEntitiesStoreModeEventSourced + + /** + * Java API + */ + def rememberEntitiesStoreModeDdata(): RememberEntitiesStoreMode = RememberEntitiesStoreModeDData + + sealed trait RememberEntitiesStoreMode { def name: String } + + object RememberEntitiesStoreMode { + + def byName(name: String): RememberEntitiesStoreMode = + if (name == RememberEntitiesStoreModeEventSourced.name) RememberEntitiesStoreModeEventSourced + else if (name == RememberEntitiesStoreModeDData.name) RememberEntitiesStoreModeDData + else + throw new IllegalArgumentException( + s"Not recognized RememberEntitiesStore, only '${RememberEntitiesStoreModeDData.name}' and '${RememberEntitiesStoreModeEventSourced.name}' are supported.") + } + final case object RememberEntitiesStoreModeEventSourced extends RememberEntitiesStoreMode { + override def name = "eventsourced" + } + final case object RememberEntitiesStoreModeDData extends RememberEntitiesStoreMode { override def name = "ddata" } + // generated using kaze-class final class TuningParameters private ( val bufferSize: Int, @@ -265,15 +309,36 @@ final class ClusterShardingSettings( val passivateIdleEntityAfter: FiniteDuration, val shardRegionQueryTimeout: FiniteDuration, val stateStoreMode: ClusterShardingSettings.StateStoreMode, + val rememberEntitiesStoreMode: ClusterShardingSettings.RememberEntitiesStoreMode, val tuningParameters: ClusterShardingSettings.TuningParameters, val coordinatorSingletonSettings: ClusterSingletonManagerSettings) { - import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModeDData - import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModePersistence - require( - stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData, - s"Unknown 'state-store-mode' [$stateStoreMode], " + - s"valid values are '${StateStoreModeDData.name}' or '${StateStoreModePersistence.name}'") + @deprecated("Use constructor with rememberEntitiesStoreMode", "2.6.6") // FIXME update version once merged + def this( + numberOfShards: Int, + role: Option[String], + dataCenter: Option[DataCenter], + rememberEntities: Boolean, + journalPluginId: String, + snapshotPluginId: String, + passivateIdleEntityAfter: FiniteDuration, + shardRegionQueryTimeout: FiniteDuration, + stateStoreMode: ClusterShardingSettings.StateStoreMode, + tuningParameters: ClusterShardingSettings.TuningParameters, + coordinatorSingletonSettings: ClusterSingletonManagerSettings) = + this( + numberOfShards, + role, + dataCenter, + rememberEntities, + journalPluginId, + snapshotPluginId, + passivateIdleEntityAfter, + shardRegionQueryTimeout, + stateStoreMode, + RememberEntitiesStoreModeDData, + tuningParameters, + coordinatorSingletonSettings) /** * INTERNAL API @@ -308,6 +373,10 @@ final class ClusterShardingSettings( def withStateStoreMode(stateStoreMode: ClusterShardingSettings.StateStoreMode): ClusterShardingSettings = copy(stateStoreMode = stateStoreMode) + def withRememberEntitiesStoreMode( + rememberEntitiesStoreMode: ClusterShardingSettings.RememberEntitiesStoreMode): ClusterShardingSettings = + copy(rememberEntitiesStoreMode = rememberEntitiesStoreMode) + def withPassivateIdleEntityAfter(duration: FiniteDuration): ClusterShardingSettings = copy(passivateIdleEntityAfter = duration) @@ -335,6 +404,7 @@ final class ClusterShardingSettings( journalPluginId: String = journalPluginId, snapshotPluginId: String = snapshotPluginId, stateStoreMode: ClusterShardingSettings.StateStoreMode = stateStoreMode, + rememberEntitiesStoreMode: ClusterShardingSettings.RememberEntitiesStoreMode = rememberEntitiesStoreMode, tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings, passivateIdleEntityAfter: FiniteDuration = passivateIdleEntityAfter, @@ -349,6 +419,7 @@ final class ClusterShardingSettings( passivateIdleEntityAfter, shardRegionQueryTimeout, stateStoreMode, + rememberEntitiesStoreMode, tuningParameters, coordinatorSingletonSettings) } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala index 6eff811961..a809d597b5 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala @@ -14,7 +14,7 @@ import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.cluster.sharding.ShardRegion.EntityId import akka.cluster.sharding.typed.ClusterShardingSettings -import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModeDData +import akka.cluster.sharding.typed.ClusterShardingSettings.{ RememberEntitiesStoreModeDData, StateStoreModeDData } import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.cluster.sharding.typed.scaladsl @@ -130,6 +130,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_]) Duration.Zero, // passivation disabled shardingBaseSettings.shardRegionQueryTimeout, StateStoreModeDData, + RememberEntitiesStoreModeDData, // not used as remembered entities is off shardingBaseSettings.tuningParameters, shardingBaseSettings.coordinatorSingletonSettings) } diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingSettingsCompileOnly.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingSettingsCompileOnly.java new file mode 100644 index 0000000000..7214f1cee1 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingSettingsCompileOnly.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.javadsl; + +import akka.actor.typed.ActorSystem; +import akka.cluster.sharding.typed.ClusterShardingSettings; + +public class ClusterShardingSettingsCompileOnly { + + static void shouldBeUsableFromJava() { + ActorSystem system = null; + ClusterShardingSettings.StateStoreMode mode = ClusterShardingSettings.stateStoreModeDdata(); + ClusterShardingSettings.create(system) + .withStateStoreMode(mode) + .withRememberEntitiesStoreMode(ClusterShardingSettings.rememberEntitiesStoreModeDdata()); + } +} diff --git a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java index 42bb87e579..139dca9c1a 100644 --- a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java +++ b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java @@ -17491,6 +17491,742 @@ public final class ClusterShardingMessages { } + public interface RememberedShardStateOrBuilder extends + // @@protoc_insertion_point(interface_extends:RememberedShardState) + akka.protobufv3.internal.MessageOrBuilder { + + /** + * repeated string shardId = 1; + * @return A list containing the shardId. + */ + java.util.List + getShardIdList(); + /** + * repeated string shardId = 1; + * @return The count of shardId. + */ + int getShardIdCount(); + /** + * repeated string shardId = 1; + * @param index The index of the element to return. + * @return The shardId at the given index. + */ + java.lang.String getShardId(int index); + /** + * repeated string shardId = 1; + * @param index The index of the value to return. + * @return The bytes of the shardId at the given index. + */ + akka.protobufv3.internal.ByteString + getShardIdBytes(int index); + + /** + * optional bool marker = 2; + * @return Whether the marker field is set. + */ + boolean hasMarker(); + /** + * optional bool marker = 2; + * @return The marker. + */ + boolean getMarker(); + } + /** + * Protobuf type {@code RememberedShardState} + */ + public static final class RememberedShardState extends + akka.protobufv3.internal.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:RememberedShardState) + RememberedShardStateOrBuilder { + private static final long serialVersionUID = 0L; + // Use RememberedShardState.newBuilder() to construct. + private RememberedShardState(akka.protobufv3.internal.GeneratedMessageV3.Builder builder) { + super(builder); + } + private RememberedShardState() { + shardId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + } + + @java.lang.Override + @SuppressWarnings({"unused"}) + protected java.lang.Object newInstance( + akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter unused) { + return new RememberedShardState(); + } + + @java.lang.Override + public final akka.protobufv3.internal.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private RememberedShardState( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + akka.protobufv3.internal.UnknownFieldSet.Builder unknownFields = + akka.protobufv3.internal.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 10: { + akka.protobufv3.internal.ByteString bs = input.readBytes(); + if (!((mutable_bitField0_ & 0x00000001) != 0)) { + shardId_ = new akka.protobufv3.internal.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000001; + } + shardId_.add(bs); + break; + } + case 16: { + bitField0_ |= 0x00000001; + marker_ = input.readBool(); + break; + } + default: { + if (!parseUnknownField( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobufv3.internal.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) != 0)) { + shardId_ = shardId_.getUnmodifiableView(); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_RememberedShardState_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_RememberedShardState_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState.Builder.class); + } + + private int bitField0_; + public static final int SHARDID_FIELD_NUMBER = 1; + private akka.protobufv3.internal.LazyStringList shardId_; + /** + * repeated string shardId = 1; + * @return A list containing the shardId. + */ + public akka.protobufv3.internal.ProtocolStringList + getShardIdList() { + return shardId_; + } + /** + * repeated string shardId = 1; + * @return The count of shardId. + */ + public int getShardIdCount() { + return shardId_.size(); + } + /** + * repeated string shardId = 1; + * @param index The index of the element to return. + * @return The shardId at the given index. + */ + public java.lang.String getShardId(int index) { + return shardId_.get(index); + } + /** + * repeated string shardId = 1; + * @param index The index of the value to return. + * @return The bytes of the shardId at the given index. + */ + public akka.protobufv3.internal.ByteString + getShardIdBytes(int index) { + return shardId_.getByteString(index); + } + + public static final int MARKER_FIELD_NUMBER = 2; + private boolean marker_; + /** + * optional bool marker = 2; + * @return Whether the marker field is set. + */ + public boolean hasMarker() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * optional bool marker = 2; + * @return The marker. + */ + public boolean getMarker() { + return marker_; + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(akka.protobufv3.internal.CodedOutputStream output) + throws java.io.IOException { + for (int i = 0; i < shardId_.size(); i++) { + akka.protobufv3.internal.GeneratedMessageV3.writeString(output, 1, shardId_.getRaw(i)); + } + if (((bitField0_ & 0x00000001) != 0)) { + output.writeBool(2, marker_); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + { + int dataSize = 0; + for (int i = 0; i < shardId_.size(); i++) { + dataSize += computeStringSizeNoTag(shardId_.getRaw(i)); + } + size += dataSize; + size += 1 * getShardIdList().size(); + } + if (((bitField0_ & 0x00000001) != 0)) { + size += akka.protobufv3.internal.CodedOutputStream + .computeBoolSize(2, marker_); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState)) { + return super.equals(obj); + } + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState other = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState) obj; + + if (!getShardIdList() + .equals(other.getShardIdList())) return false; + if (hasMarker() != other.hasMarker()) return false; + if (hasMarker()) { + if (getMarker() + != other.getMarker()) return false; + } + if (!unknownFields.equals(other.unknownFields)) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (getShardIdCount() > 0) { + hash = (37 * hash) + SHARDID_FIELD_NUMBER; + hash = (53 * hash) + getShardIdList().hashCode(); + } + if (hasMarker()) { + hash = (37 * hash) + MARKER_FIELD_NUMBER; + hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean( + getMarker()); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom( + java.nio.ByteBuffer data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom( + java.nio.ByteBuffer data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom( + akka.protobufv3.internal.ByteString data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom( + akka.protobufv3.internal.ByteString data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom(byte[] data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom( + byte[] data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseDelimitedFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom( + akka.protobufv3.internal.CodedInputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parseFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code RememberedShardState} + */ + public static final class Builder extends + akka.protobufv3.internal.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:RememberedShardState) + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardStateOrBuilder { + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_RememberedShardState_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_RememberedShardState_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState.class, akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState.Builder.class); + } + + // Construct using akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobufv3.internal.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + shardId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + marker_ = false; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + @java.lang.Override + public akka.protobufv3.internal.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.internal_static_RememberedShardState_descriptor; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState getDefaultInstanceForType() { + return akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState.getDefaultInstance(); + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState build() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState buildPartial() { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState result = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((bitField0_ & 0x00000001) != 0)) { + shardId_ = shardId_.getUnmodifiableView(); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.shardId_ = shardId_; + if (((from_bitField0_ & 0x00000002) != 0)) { + result.marker_ = marker_; + to_bitField0_ |= 0x00000001; + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + @java.lang.Override + public Builder setField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + akka.protobufv3.internal.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(akka.protobufv3.internal.Message other) { + if (other instanceof akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState) { + return mergeFrom((akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState other) { + if (other == akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState.getDefaultInstance()) return this; + if (!other.shardId_.isEmpty()) { + if (shardId_.isEmpty()) { + shardId_ = other.shardId_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureShardIdIsMutable(); + shardId_.addAll(other.shardId_); + } + onChanged(); + } + if (other.hasMarker()) { + setMarker(other.getMarker()); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + private akka.protobufv3.internal.LazyStringList shardId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + private void ensureShardIdIsMutable() { + if (!((bitField0_ & 0x00000001) != 0)) { + shardId_ = new akka.protobufv3.internal.LazyStringArrayList(shardId_); + bitField0_ |= 0x00000001; + } + } + /** + * repeated string shardId = 1; + * @return A list containing the shardId. + */ + public akka.protobufv3.internal.ProtocolStringList + getShardIdList() { + return shardId_.getUnmodifiableView(); + } + /** + * repeated string shardId = 1; + * @return The count of shardId. + */ + public int getShardIdCount() { + return shardId_.size(); + } + /** + * repeated string shardId = 1; + * @param index The index of the element to return. + * @return The shardId at the given index. + */ + public java.lang.String getShardId(int index) { + return shardId_.get(index); + } + /** + * repeated string shardId = 1; + * @param index The index of the value to return. + * @return The bytes of the shardId at the given index. + */ + public akka.protobufv3.internal.ByteString + getShardIdBytes(int index) { + return shardId_.getByteString(index); + } + /** + * repeated string shardId = 1; + * @param index The index to set the value at. + * @param value The shardId to set. + * @return This builder for chaining. + */ + public Builder setShardId( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureShardIdIsMutable(); + shardId_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string shardId = 1; + * @param value The shardId to add. + * @return This builder for chaining. + */ + public Builder addShardId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureShardIdIsMutable(); + shardId_.add(value); + onChanged(); + return this; + } + /** + * repeated string shardId = 1; + * @param values The shardId to add. + * @return This builder for chaining. + */ + public Builder addAllShardId( + java.lang.Iterable values) { + ensureShardIdIsMutable(); + akka.protobufv3.internal.AbstractMessageLite.Builder.addAll( + values, shardId_); + onChanged(); + return this; + } + /** + * repeated string shardId = 1; + * @return This builder for chaining. + */ + public Builder clearShardId() { + shardId_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + return this; + } + /** + * repeated string shardId = 1; + * @param value The bytes of the shardId to add. + * @return This builder for chaining. + */ + public Builder addShardIdBytes( + akka.protobufv3.internal.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureShardIdIsMutable(); + shardId_.add(value); + onChanged(); + return this; + } + + private boolean marker_ ; + /** + * optional bool marker = 2; + * @return Whether the marker field is set. + */ + public boolean hasMarker() { + return ((bitField0_ & 0x00000002) != 0); + } + /** + * optional bool marker = 2; + * @return The marker. + */ + public boolean getMarker() { + return marker_; + } + /** + * optional bool marker = 2; + * @param value The marker to set. + * @return This builder for chaining. + */ + public Builder setMarker(boolean value) { + bitField0_ |= 0x00000002; + marker_ = value; + onChanged(); + return this; + } + /** + * optional bool marker = 2; + * @return This builder for chaining. + */ + public Builder clearMarker() { + bitField0_ = (bitField0_ & ~0x00000002); + marker_ = false; + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:RememberedShardState) + } + + // @@protoc_insertion_point(class_scope:RememberedShardState) + private static final akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState(); + } + + public static akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + @java.lang.Deprecated public static final akka.protobufv3.internal.Parser + PARSER = new akka.protobufv3.internal.AbstractParser() { + @java.lang.Override + public RememberedShardState parsePartialFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return new RememberedShardState(input, extensionRegistry); + } + }; + + public static akka.protobufv3.internal.Parser parser() { + return PARSER; + } + + @java.lang.Override + public akka.protobufv3.internal.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.RememberedShardState getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + private static final akka.protobufv3.internal.Descriptors.Descriptor internal_static_CoordinatorState_descriptor; private static final @@ -17601,6 +18337,11 @@ public final class ClusterShardingMessages { private static final akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable internal_static_CurrentShardRegionState_fieldAccessorTable; + private static final akka.protobufv3.internal.Descriptors.Descriptor + internal_static_RememberedShardState_descriptor; + private static final + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internal_static_RememberedShardState_fieldAccessorTable; public static akka.protobufv3.internal.Descriptors.FileDescriptor getDescriptor() { @@ -17640,8 +18381,10 @@ public final class ClusterShardingMessages { "\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 \003(\t\"0\n\nSha" + "rdState\022\017\n\007shardId\030\001 \002(\t\022\021\n\tentityIds\030\002 " + "\003(\t\"F\n\027CurrentShardRegionState\022\033\n\006shards" + - "\030\001 \003(\0132\013.ShardState\022\016\n\006failed\030\002 \003(\tB&\n\"a" + - "kka.cluster.sharding.protobuf.msgH\001" + "\030\001 \003(\0132\013.ShardState\022\016\n\006failed\030\002 \003(\t\"7\n\024R" + + "ememberedShardState\022\017\n\007shardId\030\001 \003(\t\022\016\n\006" + + "marker\030\002 \001(\010B&\n\"akka.cluster.sharding.pr" + + "otobuf.msgH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -17779,6 +18522,12 @@ public final class ClusterShardingMessages { akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_CurrentShardRegionState_descriptor, new java.lang.String[] { "Shards", "Failed", }); + internal_static_RememberedShardState_descriptor = + getDescriptor().getMessageTypes().get(21); + internal_static_RememberedShardState_fieldAccessorTable = new + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( + internal_static_RememberedShardState_descriptor, + new java.lang.String[] { "ShardId", "Marker", }); } // @@protoc_insertion_point(outer_class_scope) diff --git a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto index 0ca8d1a251..2d5deae395 100644 --- a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto +++ b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto @@ -117,3 +117,8 @@ message CurrentShardRegionState { repeated ShardState shards = 1; repeated string failed = 2; } + +message RememberedShardState { + repeated string shardId = 1; + optional bool marker = 2; +} diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 31b0fbfb65..d86ca71be2 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -82,7 +82,8 @@ akka.cluster.sharding { # Defines how the coordinator stores its state. Same is also used by the # shards for rememberEntities. - # Valid values are "ddata" or "persistence". + # Valid values are "ddata" or "persistence". + # "persistence" mode is deprecated state-store-mode = "ddata" # The shard saves persistent snapshots after this number of persistent diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 40bc16aec7..9955cb7311 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -758,10 +758,11 @@ private[akka] class ClusterShardingGuardian extends Actor { // with the deprecated persistence state store mode we always use the event sourced provider for shard regions // and no store for coordinator (the coordinator is a PersistentActor in that case) val rememberEntitiesProvider = - if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) + if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) { ClusterShardingSettings.RememberEntitiesStoreEventsourced - // FIXME move to setting - else context.system.settings.config.getString("akka.cluster.sharding.remember-entities-store") + } else { + settings.rememberEntitiesStore + } Some(rememberEntitiesProvider match { case ClusterShardingSettings.RememberEntitiesStoreDData => new DDataRememberEntitiesProvider(typeName, settings, majorityMinCap, rep) @@ -778,11 +779,12 @@ private[akka] class ClusterShardingGuardian extends Actor { val shardRegion = context.child(encName).getOrElse { if (context.child(cName).isEmpty) { val coordinatorProps = - if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) + if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) { ShardCoordinator.props(typeName, settings, allocationStrategy) - else + } else { ShardCoordinator .props(typeName, settings, allocationStrategy, rep, majorityMinCap, rememberEntitiesStoreProvider) + } val singletonProps = BackoffOpts .onStop( diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 5aac6121ef..5d8433b789 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -90,6 +90,7 @@ object ClusterShardingSettings { journalPluginId = config.getString("journal-plugin-id"), snapshotPluginId = config.getString("snapshot-plugin-id"), stateStoreMode = config.getString("state-store-mode"), + rememberEntitiesStore = config.getString("remember-entities-store"), passivateIdleEntityAfter = passivateIdleAfter, shardRegionQueryTimeout = config.getDuration("shard-region-query-timeout", MILLISECONDS).millis, tuningParameters, @@ -238,6 +239,7 @@ final class ClusterShardingSettings( val journalPluginId: String, val snapshotPluginId: String, val stateStoreMode: String, + val rememberEntitiesStore: String, val passivateIdleEntityAfter: FiniteDuration, val shardRegionQueryTimeout: FiniteDuration, val tuningParameters: ClusterShardingSettings.TuningParameters, @@ -245,6 +247,34 @@ final class ClusterShardingSettings( val leaseSettings: Option[LeaseUsageSettings]) extends NoSerializationVerificationNeeded { + @deprecated( + "Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStoreinstead", + "2.6.6" // TODO update once merged + ) + def this( + role: Option[String], + rememberEntities: Boolean, + journalPluginId: String, + snapshotPluginId: String, + stateStoreMode: String, + passivateIdleEntityAfter: FiniteDuration, + shardRegionQueryTimeout: FiniteDuration, + tuningParameters: ClusterShardingSettings.TuningParameters, + coordinatorSingletonSettings: ClusterSingletonManagerSettings, + leaseSettings: Option[LeaseUsageSettings]) = + this( + role, + rememberEntities, + journalPluginId, + snapshotPluginId, + stateStoreMode, + "ddata", + passivateIdleEntityAfter, + shardRegionQueryTimeout, + tuningParameters, + coordinatorSingletonSettings, + leaseSettings) + // bin compat for 2.5.23 @deprecated( "Use the ClusterShardingSettings factory methods or the constructor including shardRegionQueryTimeout instead", @@ -269,7 +299,7 @@ final class ClusterShardingSettings( 3.seconds, tuningParameters, coordinatorSingletonSettings, - None) + leaseSettings) // bin compat for 2.5.21 @deprecated( @@ -392,6 +422,7 @@ final class ClusterShardingSettings( journalPluginId, snapshotPluginId, stateStoreMode, + rememberEntitiesStore, passivateIdleAfter, shardRegionQueryTimeout, tuningParameters, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index a8c823f424..c15eed411c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -22,8 +22,12 @@ import akka.persistence._ import akka.cluster.ClusterEvent import akka.cluster.ddata.SelfUniqueAddress import akka.cluster.sharding.ShardRegion.ShardId -import akka.cluster.sharding.internal.RememberEntitiesCoordinatorStore -import akka.cluster.sharding.internal.RememberEntitiesProvider +import akka.cluster.sharding.internal.EventSourcedRememberShards.MigrationMarker +import akka.cluster.sharding.internal.{ + EventSourcedRememberShards, + RememberEntitiesCoordinatorStore, + RememberEntitiesProvider +} import akka.event.BusLogging import akka.event.Logging import akka.util.PrettyDuration._ @@ -343,7 +347,8 @@ object ShardCoordinator { } /** - * Persistent state of the event sourced ShardCoordinator. + * State of the shard coordinator. + * Was also used as the persistent state in the old persistent coordinator. */ @SerialVersionUID(1L) final case class State private[akka] ( // region for each shard @@ -351,6 +356,7 @@ object ShardCoordinator { // shards for each region regions: Map[ActorRef, Vector[ShardId]] = Map.empty, regionProxies: Set[ActorRef] = Set.empty, + // Only used if remembered entities is enabled unallocatedShards: Set[ShardId] = Set.empty, rememberEntities: Boolean = false) extends ClusterShardingSerializable { @@ -991,6 +997,9 @@ class PersistentShardCoordinator( override def snapshotPluginId: String = settings.snapshotPluginId override def receiveRecover: Receive = { + case MigrationMarker | SnapshotOffer(_, _: EventSourcedRememberShards.State) => + throw new IllegalStateException( + "state-store is set to persistence but a migration has taken place to remember-entities-store=eventsourced. You can not downgrade.") case evt: DomainEvent => log.debug("receiveRecover {}", evt) evt match { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala index f5484c121a..ff8783b9c5 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala @@ -37,7 +37,7 @@ private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String, // Note that this one is never used for the deprecated persistent state store mode, only when state store is ddata // combined with eventsourced remember entities storage override def coordinatorStoreProps(): Props = - EventSourcedRememberShards.props(typeName) + EventSourcedRememberShards.props(typeName, settings) } /** diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala index cdd4264943..0e649a72e7 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala @@ -4,45 +4,136 @@ package akka.cluster.sharding.internal -import akka.actor.Props +import akka.actor.{ ActorLogging, Props } import akka.annotation.InternalApi +import akka.cluster.sharding.{ ClusterShardingSerializable, ClusterShardingSettings } +import akka.cluster.sharding.ShardCoordinator.Internal +import akka.cluster.sharding.ShardCoordinator.Internal.ShardHomeAllocated import akka.cluster.sharding.ShardRegion.ShardId -import akka.persistence.PersistentActor +import akka.persistence._ +import akka.persistence.journal.{ EventAdapter, EventSeq } + +import scala.collection.mutable /** * INTERNAL API */ @InternalApi private[akka] object EventSourcedRememberShards { - def props(typeName: String): Props = - Props(new EventSourcedRememberShards(typeName)) + def props(typeName: String, settings: ClusterShardingSettings): Props = + Props(new EventSourcedRememberShards(typeName, settings)) + + class FromOldCoordinatorState() extends EventAdapter { + override def manifest(event: Any): String = + "" + + override def toJournal(event: Any): Any = + event + + override def fromJournal(event: Any, manifest: String): EventSeq = { + event match { + case ShardHomeAllocated(shardId, _) => + EventSeq.single(shardId) + case _ => EventSeq.empty + } + + } + } + + case class State(shards: Set[ShardId], writtenMigrationMarker: Boolean = false) extends ClusterShardingSerializable + + case object MigrationMarker extends ClusterShardingSerializable } /** * INTERNAL API */ @InternalApi -private[akka] final class EventSourcedRememberShards(typeName: String) extends PersistentActor { +private[akka] final class EventSourcedRememberShards(typeName: String, settings: ClusterShardingSettings) + extends PersistentActor + with ActorLogging { - override val persistenceId: String = s"$typeName-remember-entitites" + import EventSourcedRememberShards._ - private var shards = Set.empty[ShardId] + // Uses the same persistence id as the old persistent coordinator so that the old data can be migrated + // without any user action + override def persistenceId = s"/sharding/${typeName}Coordinator" + override def journalPluginId: String = settings.journalPluginId + override def snapshotPluginId: String = settings.snapshotPluginId + + private val shards = mutable.Set.empty[ShardId] + private var writtenMarker = false override def receiveRecover: Receive = { case shardId: ShardId => - // FIXME optimize for adding rather than reading (which is done only once) - shards += shardId + shards.add(shardId) + case SnapshotOffer(_, state: Internal.State) => + shards ++= (state.shards.keys ++ state.unallocatedShards) + case SnapshotOffer(_, State(shardIds, marker)) => + shards ++= shardIds + writtenMarker = marker + case RecoveryCompleted => + log.debug("Recovery complete. Current shards {}. Written Marker {}", shards, writtenMarker) + if (!writtenMarker) { + persist(MigrationMarker) { _ => + log.debug("Written migration marker") + writtenMarker = true + } + } + case MigrationMarker => + writtenMarker = true + case other => + log.error( + "Unexpected message type [{}]. Are you migrating from persistent coordinator state store? If so you must add the migration event adapter. Shards will not be restarted.", + other.getClass) } override def receiveCommand: Receive = { case RememberEntitiesCoordinatorStore.GetShards => - sender() ! RememberEntitiesCoordinatorStore.RememberedShards(shards) + sender() ! RememberEntitiesCoordinatorStore.RememberedShards(shards.toSet) case RememberEntitiesCoordinatorStore.AddShard(shardId: ShardId) => persistAsync(shardId) { shardId => - shards += shardId + shards.add(shardId) sender() ! RememberEntitiesCoordinatorStore.UpdateDone(shardId) + saveSnapshotWhenNeeded() } + + case e: SaveSnapshotSuccess => + log.debug("Snapshot saved successfully") + internalDeleteMessagesBeforeSnapshot( + e, + settings.tuningParameters.keepNrOfBatches, + settings.tuningParameters.snapshotAfter) + + case SaveSnapshotFailure(_, reason) => + log.warning("Snapshot failure: [{}]", reason.getMessage) + + case DeleteMessagesSuccess(toSequenceNr) => + val deleteTo = toSequenceNr - 1 + val deleteFrom = + math.max(0, deleteTo - (settings.tuningParameters.keepNrOfBatches * settings.tuningParameters.snapshotAfter)) + log.debug( + "Messages to [{}] deleted successfully. Deleting snapshots from [{}] to [{}]", + toSequenceNr, + deleteFrom, + deleteTo) + deleteSnapshots(SnapshotSelectionCriteria(minSequenceNr = deleteFrom, maxSequenceNr = deleteTo)) + + case DeleteMessagesFailure(reason, toSequenceNr) => + log.warning("Messages to [{}] deletion failure: [{}]", toSequenceNr, reason.getMessage) + + case DeleteSnapshotsSuccess(m) => + log.debug("Snapshots matching [{}] deleted successfully", m) + + case DeleteSnapshotsFailure(m, reason) => + log.warning("Snapshots matching [{}] deletion failure: [{}]", m, reason.getMessage) } + def saveSnapshotWhenNeeded(): Unit = { + if (lastSequenceNr % settings.tuningParameters.snapshotAfter == 0 && lastSequenceNr != 0) { + log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr) + saveSnapshot(State(shards.toSet, writtenMarker)) + } + } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index 10af80a1a5..91beeb0552 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -19,6 +19,7 @@ import akka.actor.ExtendedActorSystem import akka.cluster.sharding.Shard import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm } +import akka.cluster.sharding.internal.EventSourcedRememberShards.{ MigrationMarker, State => RememberShardsState } import akka.serialization.BaseSerializer import akka.serialization.Serialization import akka.serialization.SerializerWithStringManifest @@ -91,6 +92,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private val ShardStateManifest = "FD" private val CurrentShardRegionStateManifest = "FE" + private val EventSourcedRememberShardsMigrationMarkerManifest = "SM" + private val EventSourcedRememberShardsState = "SS" + private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] => AnyRef]( EntityStateManifest -> entityStateFromBinary, EntityStartedManifest -> entityStartedFromBinary, @@ -170,8 +174,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy CurrentRegionsManifest -> { bytes => currentRegionsFromBinary(bytes) }, - StartEntityManifest -> { startEntityFromBinary(_) }, - StartEntityAckManifest -> { startEntityAckFromBinary(_) }, + StartEntityManifest -> { startEntityFromBinary }, + StartEntityAckManifest -> { startEntityAckFromBinary }, GetCurrentShardStateManifest -> { _ => GetCurrentShardState }, @@ -186,6 +190,12 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy }, CurrentShardRegionStateManifest -> { bytes => currentShardRegionStateFromBinary(bytes) + }, + EventSourcedRememberShardsMigrationMarkerManifest -> { _ => + MigrationMarker + }, + EventSourcedRememberShardsState -> { bytes => + rememberShardsStateFromBinary(bytes) }) override def manifest(obj: AnyRef): String = obj match { @@ -232,6 +242,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy case _: ShardState => ShardStateManifest case _: CurrentShardRegionState => CurrentShardRegionStateManifest + case MigrationMarker => EventSourcedRememberShardsMigrationMarkerManifest + case _: RememberShardsState => EventSourcedRememberShardsState + case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } @@ -281,6 +294,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy case m: ShardState => shardStateToProto(m).toByteArray case m: CurrentShardRegionState => currentShardRegionStateToProto(m).toByteArray + case MigrationMarker => Array.emptyByteArray + case m: RememberShardsState => rememberShardsStateToProto(m).toByteArray + case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } @@ -293,6 +309,18 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } + private def rememberShardsStateToProto(state: RememberShardsState): sm.RememberedShardState = { + val builder = sm.RememberedShardState.newBuilder() + builder.addAllShardId(state.shards.toList.asJava) + builder.setMarker(state.writtenMigrationMarker) + builder.build() + } + + private def rememberShardsStateFromBinary(bytes: Array[Byte]): RememberShardsState = { + val proto = sm.RememberedShardState.parseFrom(bytes) + RememberShardsState(proto.getShardIdList.asScala.toSet, proto.getMarker) + } + private def coordinatorStateToProto(state: State): sm.CoordinatorState = { val builder = sm.CoordinatorState.newBuilder() diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala new file mode 100644 index 0000000000..7ad8b53d1b --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding +import java.util.UUID + +import akka.actor.{ ActorRef, ActorSystem, Props } +import akka.cluster.Cluster +import akka.persistence.PersistentActor +import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } +import com.typesafe.config.{ Config, ConfigFactory } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * Test migration from old persistent shard coordinator with remembered + * entities to using a ddatabacked shard coordinator with an event sourced + * replicated entity store. + */ +object PersistentShardingMigrationSpec { + val config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.artery.canonical.port = 0 + akka.cluster.sharding { + remember-entities = on + remember-entities-store = "eventsourced" + + # this forces the remembered entity store to use persistence + # is is deprecated + state-store-mode = "persistence" + + # make sure we test snapshots + snapshot-after = 5 + } + + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/PersistentShardingMigrationSpec-${UUID + .randomUUID() + .toString}" + akka.persistence.journal.leveldb { + native = off + dir = "target/journal-PersistentShardingMigrationSpec-${UUID.randomUUID()}" + } + """) + + val configForNewMode = ConfigFactory + .parseString( + """ + akka.cluster.sharding { + remember-entities = on + remember-entities-store = "eventsourced" + state-store-mode = "ddata" + } + + akka.persistence.journal.leveldb { + event-adapters { + coordinator-migration = "akka.cluster.sharding.internal.EventSourcedRememberShards$FromOldCoordinatorState" + } + + event-adapter-bindings { + "akka.cluster.sharding.ShardCoordinator$Internal$DomainEvent" = coordinator-migration + } + } + + """) + .withFallback(config) + + case class Message(id: Long) + + class PA extends PersistentActor { + override def persistenceId: String = "pa-" + self.path.name + override def receiveRecover: Receive = { + case _ => + } + override def receiveCommand: Receive = { + case _ => + sender() ! "ack" + } + } + + val extractEntityId: ShardRegion.ExtractEntityId = { + case msg @ Message(id) => (id.toString, msg) + } + + def extractShardId(probe: ActorRef): ShardRegion.ExtractShardId = { + case Message(id) => id.toString + case ShardRegion.StartEntity(id) => + // StartEntity is used by remembering entities feature + probe ! id + id + } +} + +class PersistentShardingMigrationSpec extends AkkaSpec(PersistentShardingMigrationSpec.config) with ImplicitSender { + + import PersistentShardingMigrationSpec._ + + "Migration" should { + "allow migration of remembered shards and now allow going back" in { + val typeName = "Migration" + + withSystem(config, typeName, "OldMode") { (_, region, _) => + region ! Message(1) + expectMsg("ack") + region ! Message(2) + expectMsg("ack") + region ! Message(3) + expectMsg("ack") + } + + withSystem(configForNewMode, typeName, "NewMode") { (system, region, rememberedEntitiesProbe) => + val probe = TestProbe()(system) + region.tell(Message(1), probe.ref) + probe.expectMsg("ack") + Set( + rememberedEntitiesProbe.expectMsgType[String], + rememberedEntitiesProbe.expectMsgType[String], + rememberedEntitiesProbe + .expectMsgType[String]) shouldEqual Set("1", "2", "3") // 1-2 from the snapshot, 3 from a replayed message + rememberedEntitiesProbe.expectNoMessage() + } + + withSystem(config, typeName, "OldModeAfterMigration") { (system, region, _) => + val probe = TestProbe()(system) + region.tell(Message(1), probe.ref) + import scala.concurrent.duration._ + probe.expectNoMessage(5.seconds) // sharding should have failed to start + } + } + "not allow going back to persistence mode based on a snapshot" in { + val typeName = "Snapshots" + withSystem(configForNewMode, typeName, "NewMode") { (system, region, _) => + val probe = TestProbe()(system) + for (i <- 1 to 5) { + region.tell(Message(i), probe.ref) + probe.expectMsg("ack") + } + } + + withSystem(config, typeName, "OldMode") { (system, region, _) => + val probe = TestProbe()(system) + region.tell(Message(1), probe.ref) + probe.expectNoMessage(2.seconds) + } + } + + def withSystem(config: Config, typeName: String, systemName: String)( + f: (ActorSystem, ActorRef, TestProbe) => Unit) = { + val system = ActorSystem(systemName, config) + Cluster(system).join(Cluster(system).selfAddress) + try { + val rememberedEntitiesProbe = TestProbe()(system) + val region = ClusterSharding(system).start( + typeName, + Props(new PA()), + extractEntityId, + extractShardId(rememberedEntitiesProbe.ref)) + f(system, region, rememberedEntitiesProbe) + } finally { + Await.ready(system.terminate(), 20.seconds) + } + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategySpec.scala index 2384907b25..7700e5700e 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/external/ExternalShardAllocationStrategySpec.scala @@ -16,6 +16,7 @@ import scala.concurrent.duration._ class ExternalShardAllocationStrategySpec extends AkkaSpec(""" akka.actor.provider = cluster akka.loglevel = INFO + akka.remote.artery.canonical.port = 0 """) { val requester = TestProbe() diff --git a/akka-docs/src/main/paradox/includes/cluster.md b/akka-docs/src/main/paradox/includes/cluster.md index 242b956d87..6227e376a1 100644 --- a/akka-docs/src/main/paradox/includes/cluster.md +++ b/akka-docs/src/main/paradox/includes/cluster.md @@ -51,7 +51,12 @@ Reliable delivery and flow control of messages between actors in the Cluster. @@@ warning -Persistence for state store mode is deprecated. +Persistence for state store mode is deprecated. It is recommended to migrate to `ddata` for the coordinator state and if using replicated entities +migrate to `eventsourced` for the replicated entities state. + +The data written by the deprecated `persistence` state store mode for remembered entities can be read by the new remember entities `eventsourced` mode. + +Once you've migrated you can not go back to `persistence` mode. @@@ diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 3e1aa3e4d1..2d5eb70780 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -210,23 +210,24 @@ See the API documentation of @scala[`akka.cluster.sharding.ShardAllocationStrate See @ref:[Cluster Sharding concepts](cluster-sharding-concepts.md). -## Sharding State Store Mode +## Sharding State There are two cluster sharding states managed: 1. @ref:[ShardCoordinator State](cluster-sharding-concepts.md#shardcoordinator-state) - the `Shard` locations -1. @ref:[Remembering Entities](#remembering-entities) - the entities in each `Shard`, which is optional, and disabled by default +1. @ref:[Remembering Entities](#remembering-entities) - the active shards and the entities in each `Shard`, which is optional, and disabled by default -For these, there are currently two modes which define how these states are stored: + +### State Store Mode + +For shard coordinator state, there are two modes which define how these states are stored. * @ref:[Distributed Data Mode](#distributed-data-mode) - uses Akka @ref:[Distributed Data](distributed-data.md) (CRDTs) (the default) * @ref:[Persistence Mode](#persistence-mode) - (deprecated) uses Akka @ref:[Persistence](persistence.md) (Event Sourcing) @@include[cluster.md](../includes/cluster.md) { #sharding-persistence-mode-deprecated } - -Changing the mode requires @ref:[a full cluster restart](../additional/rolling-updates.md#cluster-sharding-configuration-change). -### Distributed Data Mode +#### Distributed Data Mode This mode is enabled with configuration (enabled by default): @@ -238,9 +239,6 @@ The state of the `ShardCoordinator` is replicated across the cluster but is not The `ShardCoordinator` state replication is handled by @ref:[Distributed Data](distributed-data.md) with `WriteMajority`/`ReadMajority` consistency. When all nodes in the cluster have been stopped, the state is no longer needed and dropped. -The state of @ref:[Remembering Entities](#remembering-entities) is durable and stored to -disk. This means remembered entities are restarted even after a complete (non-rolling) cluster restart when the disk is still available. - Cluster Sharding uses its own Distributed Data `Replicator` per node. If using roles with sharding there is one `Replicator` per role, which enables a subset of all nodes for some entity types and another subset for other entity types. Each such replicator has a name @@ -252,9 +250,9 @@ The settings for Distributed Data are configured in the section `akka.cluster.sharding.distributed-data`. It's not possible to have different `distributed-data` settings for different sharding entity types. -### Persistence Mode +#### Persistence Mode -This mode is enabled with configuration: +Enable persistence with configuration: ``` akka.cluster.sharding.state-store-mode = persistence @@ -262,15 +260,66 @@ akka.cluster.sharding.state-store-mode = persistence Since it is running in a cluster @ref:[Persistence](persistence.md) must be configured with a distributed journal. -@@@ note +@@@ warning -Persistence mode for @ref:[Remembering Entities](#remembering-entities) will be replaced by a pluggable data access API with storage implementations, -see @github[#27763](#27763). -New sharding applications should no longer choose persistence mode. Existing users of persistence mode -[can eventually migrate to the replacement options](https://github.com/akka/akka/issues/26177). +Persistence mode for @ref:[Remembering Entities](#remembering-entities) has been replaced by a remember entities state mode. It should not be +used for new projects and existing projects should migrate as soon as possible. @@@ +### Remember Entities State Store + +For remember entities state store the options are: + +1. `ddata` +1. `eventsourced` + +#### Distributed Data Mode + +Enable ddata mode with (enabled by default): + +``` +akka.cluster.sharding.remember-entities-store = ddata +``` + +The state of @ref:[Remembering Entities](#remembering-entities) is durable and stored to +disk. This means remembered entities are restarted even after a complete (non-rolling) cluster restart when the disk is still available. + +If you are running in an environment that doesn't have access to disk between restarts you can either disable the durable storage in +ddata by setting: + +``` +akka.cluster.sharding.distributed-data.durable.keys = [] +``` + +To support remembering entities in this type of environment after a full cluster shutdown use the `eventsourced` remember entities store instead. + +#### Event sourced mode + +Enable `eventsourced` mode with: + +``` +akka.cluster.sharding.remember-entities-store = eventsourced +``` + +This mode uses @ref:[Event Sourcing](./persistence.md) to store the active shards and active entities for each shard so a persistence and snapshot plugin must be configured. + +``` +akka.cluster.sharding.journal-plugin-id = +akka.cluster.sharding.snapshot-plugin-id = +``` + +### Migrating from Persistence mode + +Persistence mode for shard coordinator state is deprecated. If not using remembered entities you can migrate to ddata with a full cluster restart. + +If using remembered entities there are two migration options: + +* `ddata` for the state store and `ddata` for remembering entities. All remembered entities will be lost after a full cluster restart. +* `ddata` for the state store and `eventsourced` for remembering entities. The new `eventsourced` remembering entities store will read the data written by the old `persistence` mode so your remembered entities will be remembered. + +Once you have migrated you can not go back to the old persistence store. + ## Passivation If the state of the entities are persistent you may stop entities that are not used to