[Only] Deprecate Persistent mode of Cluster Sharding (#27585)

This commit is contained in:
Helena Edelson 2019-08-30 07:25:29 -07:00 committed by GitHub
parent bcf8797443
commit 186b2bbc70
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 65 deletions

View file

@ -103,8 +103,7 @@ object ClusterShardingSettings {
if (name == StateStoreModePersistence.name) StateStoreModePersistence
else if (name == StateStoreModeDData.name) StateStoreModeDData
else
throw new IllegalArgumentException(
"Not recognized StateStoreMode, only 'persistence' and 'ddata' are supported.")
throw new IllegalArgumentException("Not recognized StateStoreMode, only 'ddata' is supported.")
}
final case object StateStoreModePersistence extends StateStoreMode { override def name = "persistence" }
final case object StateStoreModeDData extends StateStoreMode { override def name = "ddata" }

View file

@ -281,6 +281,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence)
log.warning("Cluster Sharding has been set to use the deprecated `persistence` state store mode.")
if (settings.shouldHostShard(cluster)) {
regions.get(typeName) match {
case null =>

View file

@ -4,16 +4,17 @@
package akka.cluster.sharding
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.InternalApi
import com.typesafe.config.Config
import akka.cluster.Cluster
import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.coordination.lease.LeaseUsageSettings
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config
object ClusterShardingSettings {
@ -299,7 +300,8 @@ final class ClusterShardingSettings(
tuningParameters,
coordinatorSingletonSettings)
import ClusterShardingSettings.{ StateStoreModeDData, StateStoreModePersistence }
import ClusterShardingSettings.StateStoreModeDData
import ClusterShardingSettings.StateStoreModePersistence
require(
stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData,
s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'")

View file

@ -124,31 +124,31 @@ private[akka] object Shard {
handOffStopMessage: Any,
replicator: ActorRef,
majorityMinCap: Int): Props = {
if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) {
Props(
new DDataShard(
typeName,
shardId,
entityProps,
settings,
extractEntityId,
extractShardId,
handOffStopMessage,
replicator,
majorityMinCap)).withDeploy(Deploy.local)
} else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence)
Props(
new PersistentShard(
typeName,
shardId,
entityProps,
settings,
extractEntityId,
extractShardId,
handOffStopMessage)).withDeploy(Deploy.local)
else
Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage))
.withDeploy(Deploy.local)
(if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) {
Props(
new DDataShard(
typeName,
shardId,
entityProps,
settings,
extractEntityId,
extractShardId,
handOffStopMessage,
replicator,
majorityMinCap))
} else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence)
Props(
new PersistentShard(
typeName,
shardId,
entityProps,
settings,
extractEntityId,
extractShardId,
handOffStopMessage))
else {
Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage))
}).withDeploy(Deploy.local)
}
case object PassivateIdleTick extends NoSerializationVerificationNeeded
@ -1019,6 +1019,6 @@ final class ConstantRateEntityRecoveryStrategy(
}
._2
private def scheduleEntities(interval: FiniteDuration, entityIds: Set[EntityId]) =
private def scheduleEntities(interval: FiniteDuration, entityIds: Set[EntityId]): Future[Set[EntityId]] =
after(interval, actorSystem.scheduler)(Future.successful[Set[EntityId]](entityIds))
}

View file

@ -4,8 +4,6 @@
package akka.cluster.sharding
import akka.util.Timeout
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
@ -26,6 +24,8 @@ import akka.cluster.ddata.GSetKey
import akka.cluster.ddata.Key
import akka.cluster.ddata.ReplicatedData
import akka.cluster.ddata.SelfUniqueAddress
import akka.util.Timeout
import com.github.ghik.silencer.silent
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
@ -38,6 +38,7 @@ object ShardCoordinator {
* INTERNAL API
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor.
*/
@silent("deprecated")
private[akka] def props(
typeName: String,
settings: ClusterShardingSettings,
@ -920,6 +921,7 @@ abstract class ShardCoordinator(
*
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
@deprecated("Use `ddata` mode, persistence mode is deprecated.", "2.6.0")
class PersistentShardCoordinator(
typeName: String,
settings: ClusterShardingSettings,

View file

@ -226,7 +226,7 @@ will stop all entities in that shard by sending the specified `stopMessage`
(default `PoisonPill`) to them. When all entities have been terminated the `ShardRegion`
owning the entities will acknowledge the handoff as completed to the coordinator.
Thereafter the coordinator will reply to requests for the location of
the shard and thereby allocate a new home for the shard and then buffered messages in the
the shard, thereby allocating a new home for the shard, and then buffered messages in the
`ShardRegion` actors are delivered to the new location. This means that the state of the entities
are not transferred or migrated. If the state of the entities are of importance it should be
persistent (durable), e.g. with @ref:[Persistence](persistence.md), so that it can be recovered at the new
@ -251,10 +251,12 @@ the number of shards (and therefore load) between different nodes may be signifi
### ShardCoordinator State
The state of shard locations in the `ShardCoordinator` is persistent (durable) with
@ref:[Distributed Data](distributed-data.md) or @ref:[Persistence](persistence.md) to survive failures. When a crashed or
@ref:[Distributed Data](distributed-data.md) to survive failures.
When a crashed or
unreachable coordinator node has been removed (via down) from the cluster a new `ShardCoordinator` singleton
actor will take over and the state is recovered. During such a failure period shards
with known location are still available, while messages for new (unknown) shards
with a known location are still available, while messages for new (unknown) shards
are buffered until the new `ShardCoordinator` becomes available.
### Message ordering
@ -263,7 +265,7 @@ As long as a sender uses the same `ShardRegion` actor to deliver messages to an
actor the order of the messages is preserved. As long as the buffer limit is not reached
messages are delivered on a best effort basis, with at-most once delivery semantics,
in the same way as ordinary message sending. Reliable end-to-end messaging, with
at-least-once semantics can be added by using `AtLeastOnceDelivery` in @ref:[Persistence](persistence.md).
at-least-once semantics can be added by using `AtLeastOnceDelivery` in @ref:[Persistence](persistence.md).
### Overhead
@ -274,18 +276,23 @@ shard resolution, e.g. to avoid too fine grained shards. Once a shard's location
the only overhead is sending a message via the `ShardRegion` rather than directly.
<a id="cluster-sharding-mode"></a>
## Distributed Data vs. Persistence Mode
## Sharding State Store Mode
The state of the coordinator and the state of [Remembering Entities](#cluster-sharding-remembering) of the shards
are persistent (durable) to survive failures. @ref:[Distributed Data](distributed-data.md) or @ref:[Persistence](persistence.md)
can be used for the storage. Distributed Data is used by default.
There are two cluster sharding states managed:
1. @ref:[ShardCoordinator State](#shardcoordinator-state) - the `Shard` locations
1. @ref:[Remembering Entities](#remembering-entities) - the entities in each `Shard`, which is optional, and disabled by default
The functionality when using the two modes is the same. If your sharded entities are not using Akka Persistence
themselves it is more convenient to use the Distributed Data mode, since then you don't have to
setup and operate a separate data store (e.g. Cassandra) for persistence. Aside from that, there are
no major reasons for using one mode over the other.
For these, there are currently two modes which define how these states are stored:
* @ref:[Distributed Data Mode](#distributed-data-mode) - uses Akka @ref:[Distributed Data](distributed-data.md) (CRDTs) (the default)
* @ref:[Persistence Mode](#persistence-mode) - (deprecated) uses Akka @ref:[Persistence](persistence.md) (Event Sourcing)
Changing persistence mode requires @ref:[a full cluster restart](additional/rolling-updates.md#cluster-sharding-configuration-change).
@@@ warning
Persistence for state store mode is deprecated.
@@@
Changing the mode requires @ref:[a full cluster restart](additional/rolling-updates.md#cluster-sharding-configuration-change).
### Distributed Data Mode
@ -299,10 +306,10 @@ The state of the `ShardCoordinator` is replicated across the cluster but is not
The `ShardCoordinator` state replication is handled by @ref:[Distributed Data](distributed-data.md) with `WriteMajority`/`ReadMajority` consistency.
When all nodes in the cluster have been stopped, the state is no longer needed and dropped.
The state of [Remembering Entities](#cluster-sharding-remembering) is durable and stored to
The state of @ref:[Remembering Entities](#remembering-entities) is durable and stored to
disk. This means remembered entities are restarted even after a complete (non-rolling) cluster restart when the disk is still available.
Cluster Sharding is using its own Distributed Data `Replicator` per node.
Cluster Sharding uses its own Distributed Data `Replicator` per node.
If using roles with sharding there is one `Replicator` per role, which enables a subset of
all nodes for some entity types and another subset for other entity types. Each such replicator has a name
that contains the node role and therefore the role configuration must be the same on all nodes in the
@ -323,6 +330,14 @@ akka.cluster.sharding.state-store-mode = persistence
Since it is running in a cluster @ref:[Persistence](persistence.md) must be configured with a distributed journal.
@@@ note
Persistence mode for @ref:[Remembering Entities](#remembering-entities) will be replaced by a pluggable data access API with storage implementations.
New sharding applications should no longer choose persistence mode. Existing users of persistence mode
[can eventually migrate to the replacement options](https://github.com/akka/akka/issues/26177).
@@@
## Startup after minimum number of members
It's good to use Cluster Sharding with the Cluster setting `akka.cluster.min-nr-of-members` or
@ -368,7 +383,15 @@ It is always disabled if @ref:[Remembering Entities](#remembering-entities) is e
<a id="cluster-sharding-remembering"></a>
## Remembering Entities
The list of entities in each `Shard` can be made persistent (durable) by setting
Remembering entities pertains to restarting entities after a rebalance or recovering from a crash.
Enabling or disabling (the default) this feature drives the behavior of the restarts:
* enabled: entities are restarted, even though no new messages are sent to them
* disabled: entities are restarted, on demand when a new message arrives
Note that the state of the entities themselves will not be restored unless they have been made persistent,
for example with @ref:[Event Sourcing](persistence.md).
To make the list of entities in each `Shard` persistent (durable), set
the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
`ClusterSharding.start` and making sure the `shardIdExtractor` handles
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
@ -380,13 +403,21 @@ Scala
Java
: @@snip [ClusterShardingTest.java](/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity }
When configured to remember entities, whenever a `Shard` is rebalanced onto another
This can also be configured by setting `akka.cluster.sharding.remember-entities = on`.
The performance cost of `rememberEntities` is rather high when starting/stopping entities and when
shards are rebalanced. This cost increases with number of entities per shard, thus it is not
recommended with more than 10000 active (non passivated) entities per shard.
### Behavior When Enabled
When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto another
node or recovers after a crash it will recreate all the entities which were previously
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
sent to the parent of the entity actor, otherwise the entity will be automatically
restarted after the entity restart backoff specified in the configuration.
When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are
When [Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are
stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
configuration of the `akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since
the default directory contains the remote port of the actor system. If using a dynamically
@ -401,17 +432,11 @@ you can disable durable storage and benefit from better performance by using the
akka.cluster.sharding.distributed-data.durable.keys = []
```
When `rememberEntities` is set to false, a `Shard` will not automatically restart any entities
after a rebalance or recovering from a crash. Entities will only be started once the first message
for that entity has been received in the `Shard`. Entities will not be restarted if they stop without
using a `Passivate`.
### Behavior When Not Enabled
Note that the state of the entities themselves will not be restored unless they have been made persistent,
e.g. with @ref:[Persistence](persistence.md).
The performance cost of `rememberEntities` is rather high when starting/stopping entities and when
shards are rebalanced. This cost increases with number of entities per shard and we currently don't
recommend using it with more than 10000 active (non passivated) entities per shard.
When `rememberEntities` is disabled (the default), a `Shard` will not automatically restart any entities
after a rebalance or recovering from a crash. Instead, entities are started once the first message
for that entity has been received in the `Shard`.
## Supervision
@ -451,9 +476,9 @@ graceful leaving process of a cluster member.
<a id="removeinternalclustershardingdata"></a>
## Removal of Internal Cluster Sharding Data
The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence.
This data can safely be removed when restarting the whole Akka Cluster.
Note that this is not application data.
The Cluster Sharding `ShardCoordinator` stores locations of the shards.
This data is safely be removed when restarting the whole Akka Cluster.
Note that this does not include application data.
There is a utility program `akka.cluster.sharding.RemoveInternalClusterShardingData`
that removes this data.

View file

@ -132,7 +132,7 @@ Scala
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #behavior }
## Cluster Sharding and persistence
## Cluster Sharding and EventSourcedBehavior
In a use case where the number of persistent actors needed are higher than what would fit in the memory of one node or
where resilience is important so that if a node crashes the persistent actors are quickly started on a new node and can