From 186b2bbc7011246f12a440ba95c2f1c6802af364 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Fri, 30 Aug 2019 07:25:29 -0700 Subject: [PATCH] [Only] Deprecate Persistent mode of Cluster Sharding (#27585) --- .../typed/ClusterShardingSettings.scala | 3 +- .../cluster/sharding/ClusterSharding.scala | 3 + .../sharding/ClusterShardingSettings.scala | 8 +- .../scala/akka/cluster/sharding/Shard.scala | 52 +++++------ .../cluster/sharding/ShardCoordinator.scala | 6 +- .../src/main/paradox/cluster-sharding.md | 87 ++++++++++++------- .../src/main/paradox/typed/persistence.md | 2 +- 7 files changed, 96 insertions(+), 65 deletions(-) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index e50b58dc6a..bc2cfcbb2e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -103,8 +103,7 @@ object ClusterShardingSettings { if (name == StateStoreModePersistence.name) StateStoreModePersistence else if (name == StateStoreModeDData.name) StateStoreModeDData else - throw new IllegalArgumentException( - "Not recognized StateStoreMode, only 'persistence' and 'ddata' are supported.") + throw new IllegalArgumentException("Not recognized StateStoreMode, only 'ddata' is supported.") } final case object StateStoreModePersistence extends StateStoreMode { override def name = "persistence" } final case object StateStoreModeDData extends StateStoreMode { override def name = "ddata" } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index cddc4d95e3..ab2641ded3 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -281,6 +281,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = { + if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) + log.warning("Cluster Sharding has been set to use the deprecated `persistence` state store mode.") + if (settings.shouldHostShard(cluster)) { regions.get(typeName) match { case null => diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 392b7bca3d..4a60ccc710 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -4,16 +4,17 @@ package akka.cluster.sharding -import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.actor.NoSerializationVerificationNeeded import akka.annotation.InternalApi -import com.typesafe.config.Config import akka.cluster.Cluster import akka.cluster.singleton.ClusterSingletonManagerSettings import akka.coordination.lease.LeaseUsageSettings import akka.util.JavaDurationConverters._ +import com.typesafe.config.Config object ClusterShardingSettings { @@ -299,7 +300,8 @@ final class ClusterShardingSettings( tuningParameters, coordinatorSingletonSettings) - import ClusterShardingSettings.{ StateStoreModeDData, StateStoreModePersistence } + import ClusterShardingSettings.StateStoreModeDData + import ClusterShardingSettings.StateStoreModePersistence require( stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData, s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 32647c50cd..5be4791513 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -124,31 +124,31 @@ private[akka] object Shard { handOffStopMessage: Any, replicator: ActorRef, majorityMinCap: Int): Props = { - if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) { - Props( - new DDataShard( - typeName, - shardId, - entityProps, - settings, - extractEntityId, - extractShardId, - handOffStopMessage, - replicator, - majorityMinCap)).withDeploy(Deploy.local) - } else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) - Props( - new PersistentShard( - typeName, - shardId, - entityProps, - settings, - extractEntityId, - extractShardId, - handOffStopMessage)).withDeploy(Deploy.local) - else - Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) - .withDeploy(Deploy.local) + (if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) { + Props( + new DDataShard( + typeName, + shardId, + entityProps, + settings, + extractEntityId, + extractShardId, + handOffStopMessage, + replicator, + majorityMinCap)) + } else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) + Props( + new PersistentShard( + typeName, + shardId, + entityProps, + settings, + extractEntityId, + extractShardId, + handOffStopMessage)) + else { + Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) + }).withDeploy(Deploy.local) } case object PassivateIdleTick extends NoSerializationVerificationNeeded @@ -1019,6 +1019,6 @@ final class ConstantRateEntityRecoveryStrategy( } ._2 - private def scheduleEntities(interval: FiniteDuration, entityIds: Set[EntityId]) = + private def scheduleEntities(interval: FiniteDuration, entityIds: Set[EntityId]): Future[Set[EntityId]] = after(interval, actorSystem.scheduler)(Future.successful[Set[EntityId]](entityIds)) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index d4e2bd3aae..b67cbe89ed 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -4,8 +4,6 @@ package akka.cluster.sharding -import akka.util.Timeout - import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ @@ -26,6 +24,8 @@ import akka.cluster.ddata.GSetKey import akka.cluster.ddata.Key import akka.cluster.ddata.ReplicatedData import akka.cluster.ddata.SelfUniqueAddress +import akka.util.Timeout +import com.github.ghik.silencer.silent /** * @see [[ClusterSharding$ ClusterSharding extension]] @@ -38,6 +38,7 @@ object ShardCoordinator { * INTERNAL API * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor. */ + @silent("deprecated") private[akka] def props( typeName: String, settings: ClusterShardingSettings, @@ -920,6 +921,7 @@ abstract class ShardCoordinator( * * @see [[ClusterSharding$ ClusterSharding extension]] */ +@deprecated("Use `ddata` mode, persistence mode is deprecated.", "2.6.0") class PersistentShardCoordinator( typeName: String, settings: ClusterShardingSettings, diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index 2834727d46..56eb01efb1 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -226,7 +226,7 @@ will stop all entities in that shard by sending the specified `stopMessage` (default `PoisonPill`) to them. When all entities have been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed to the coordinator. Thereafter the coordinator will reply to requests for the location of -the shard and thereby allocate a new home for the shard and then buffered messages in the +the shard, thereby allocating a new home for the shard, and then buffered messages in the `ShardRegion` actors are delivered to the new location. This means that the state of the entities are not transferred or migrated. If the state of the entities are of importance it should be persistent (durable), e.g. with @ref:[Persistence](persistence.md), so that it can be recovered at the new @@ -251,10 +251,12 @@ the number of shards (and therefore load) between different nodes may be signifi ### ShardCoordinator State The state of shard locations in the `ShardCoordinator` is persistent (durable) with -@ref:[Distributed Data](distributed-data.md) or @ref:[Persistence](persistence.md) to survive failures. When a crashed or +@ref:[Distributed Data](distributed-data.md) to survive failures. + +When a crashed or unreachable coordinator node has been removed (via down) from the cluster a new `ShardCoordinator` singleton actor will take over and the state is recovered. During such a failure period shards -with known location are still available, while messages for new (unknown) shards +with a known location are still available, while messages for new (unknown) shards are buffered until the new `ShardCoordinator` becomes available. ### Message ordering @@ -263,7 +265,7 @@ As long as a sender uses the same `ShardRegion` actor to deliver messages to an actor the order of the messages is preserved. As long as the buffer limit is not reached messages are delivered on a best effort basis, with at-most once delivery semantics, in the same way as ordinary message sending. Reliable end-to-end messaging, with -at-least-once semantics can be added by using `AtLeastOnceDelivery` in @ref:[Persistence](persistence.md). +at-least-once semantics can be added by using `AtLeastOnceDelivery` in @ref:[Persistence](persistence.md). ### Overhead @@ -274,18 +276,23 @@ shard resolution, e.g. to avoid too fine grained shards. Once a shard's location the only overhead is sending a message via the `ShardRegion` rather than directly. -## Distributed Data vs. Persistence Mode +## Sharding State Store Mode -The state of the coordinator and the state of [Remembering Entities](#cluster-sharding-remembering) of the shards -are persistent (durable) to survive failures. @ref:[Distributed Data](distributed-data.md) or @ref:[Persistence](persistence.md) -can be used for the storage. Distributed Data is used by default. +There are two cluster sharding states managed: +1. @ref:[ShardCoordinator State](#shardcoordinator-state) - the `Shard` locations +1. @ref:[Remembering Entities](#remembering-entities) - the entities in each `Shard`, which is optional, and disabled by default + +For these, there are currently two modes which define how these states are stored: +* @ref:[Distributed Data Mode](#distributed-data-mode) - uses Akka @ref:[Distributed Data](distributed-data.md) (CRDTs) (the default) +* @ref:[Persistence Mode](#persistence-mode) - (deprecated) uses Akka @ref:[Persistence](persistence.md) (Event Sourcing) -The functionality when using the two modes is the same. If your sharded entities are not using Akka Persistence -themselves it is more convenient to use the Distributed Data mode, since then you don't have to -setup and operate a separate data store (e.g. Cassandra) for persistence. Aside from that, there are -no major reasons for using one mode over the other. +@@@ warning -Changing persistence mode requires @ref:[a full cluster restart](additional/rolling-updates.md#cluster-sharding-configuration-change). +Persistence for state store mode is deprecated. + +@@@ + +Changing the mode requires @ref:[a full cluster restart](additional/rolling-updates.md#cluster-sharding-configuration-change). ### Distributed Data Mode @@ -299,10 +306,10 @@ The state of the `ShardCoordinator` is replicated across the cluster but is not The `ShardCoordinator` state replication is handled by @ref:[Distributed Data](distributed-data.md) with `WriteMajority`/`ReadMajority` consistency. When all nodes in the cluster have been stopped, the state is no longer needed and dropped. -The state of [Remembering Entities](#cluster-sharding-remembering) is durable and stored to +The state of @ref:[Remembering Entities](#remembering-entities) is durable and stored to disk. This means remembered entities are restarted even after a complete (non-rolling) cluster restart when the disk is still available. -Cluster Sharding is using its own Distributed Data `Replicator` per node. +Cluster Sharding uses its own Distributed Data `Replicator` per node. If using roles with sharding there is one `Replicator` per role, which enables a subset of all nodes for some entity types and another subset for other entity types. Each such replicator has a name that contains the node role and therefore the role configuration must be the same on all nodes in the @@ -323,6 +330,14 @@ akka.cluster.sharding.state-store-mode = persistence Since it is running in a cluster @ref:[Persistence](persistence.md) must be configured with a distributed journal. +@@@ note + +Persistence mode for @ref:[Remembering Entities](#remembering-entities) will be replaced by a pluggable data access API with storage implementations. +New sharding applications should no longer choose persistence mode. Existing users of persistence mode +[can eventually migrate to the replacement options](https://github.com/akka/akka/issues/26177). + +@@@ + ## Startup after minimum number of members It's good to use Cluster Sharding with the Cluster setting `akka.cluster.min-nr-of-members` or @@ -368,7 +383,15 @@ It is always disabled if @ref:[Remembering Entities](#remembering-entities) is e ## Remembering Entities -The list of entities in each `Shard` can be made persistent (durable) by setting +Remembering entities pertains to restarting entities after a rebalance or recovering from a crash. +Enabling or disabling (the default) this feature drives the behavior of the restarts: +* enabled: entities are restarted, even though no new messages are sent to them +* disabled: entities are restarted, on demand when a new message arrives + +Note that the state of the entities themselves will not be restored unless they have been made persistent, +for example with @ref:[Event Sourcing](persistence.md). + +To make the list of entities in each `Shard` persistent (durable), set the `rememberEntities` flag to true in `ClusterShardingSettings` when calling `ClusterSharding.start` and making sure the `shardIdExtractor` handles `Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to @@ -380,13 +403,21 @@ Scala Java : @@snip [ClusterShardingTest.java](/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity } -When configured to remember entities, whenever a `Shard` is rebalanced onto another +This can also be configured by setting `akka.cluster.sharding.remember-entities = on`. + +The performance cost of `rememberEntities` is rather high when starting/stopping entities and when +shards are rebalanced. This cost increases with number of entities per shard, thus it is not +recommended with more than 10000 active (non passivated) entities per shard. + +### Behavior When Enabled + +When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto another node or recovers after a crash it will recreate all the entities which were previously running in that `Shard`. To permanently stop entities, a `Passivate` message must be sent to the parent of the entity actor, otherwise the entity will be automatically restarted after the entity restart backoff specified in the configuration. -When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are +When [Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the configuration of the `akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since the default directory contains the remote port of the actor system. If using a dynamically @@ -401,17 +432,11 @@ you can disable durable storage and benefit from better performance by using the akka.cluster.sharding.distributed-data.durable.keys = [] ``` -When `rememberEntities` is set to false, a `Shard` will not automatically restart any entities -after a rebalance or recovering from a crash. Entities will only be started once the first message -for that entity has been received in the `Shard`. Entities will not be restarted if they stop without -using a `Passivate`. +### Behavior When Not Enabled -Note that the state of the entities themselves will not be restored unless they have been made persistent, -e.g. with @ref:[Persistence](persistence.md). - -The performance cost of `rememberEntities` is rather high when starting/stopping entities and when -shards are rebalanced. This cost increases with number of entities per shard and we currently don't -recommend using it with more than 10000 active (non passivated) entities per shard. +When `rememberEntities` is disabled (the default), a `Shard` will not automatically restart any entities +after a rebalance or recovering from a crash. Instead, entities are started once the first message +for that entity has been received in the `Shard`. ## Supervision @@ -451,9 +476,9 @@ graceful leaving process of a cluster member. ## Removal of Internal Cluster Sharding Data -The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence. -This data can safely be removed when restarting the whole Akka Cluster. -Note that this is not application data. +The Cluster Sharding `ShardCoordinator` stores locations of the shards. +This data is safely be removed when restarting the whole Akka Cluster. +Note that this does not include application data. There is a utility program `akka.cluster.sharding.RemoveInternalClusterShardingData` that removes this data. diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 92a558535b..bad31d8735 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -132,7 +132,7 @@ Scala Java : @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #behavior } -## Cluster Sharding and persistence +## Cluster Sharding and EventSourcedBehavior In a use case where the number of persistent actors needed are higher than what would fit in the memory of one node or where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can