Cluster Sharding: General Typed docs cleanup after all API changes (#27764)
This commit is contained in:
parent
20238d975f
commit
f3f2ffd7fb
15 changed files with 519 additions and 419 deletions
|
|
@ -86,7 +86,7 @@ in an application composed of multiple JARs to reside under a single package nam
|
|||
might scan all classes from `com.example.plugins` for specific service implementations with that package existing in
|
||||
several contributed JARs.
|
||||
While it is possible to support overlapping packages with complex manifest headers, it's much better to use non-overlapping
|
||||
package spaces and facilities such as @ref:[Akka Cluster](../typed/cluster-specification.md)
|
||||
package spaces and facilities such as @ref:[Akka Cluster](../typed/cluster-concepts.md)
|
||||
for service discovery. Stylistically, many organizations opt to use the root package path as the name of the bundle
|
||||
distribution file.
|
||||
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ Additionally you can find advice on @ref:[Persistence - Schema Evolution](../per
|
|||
|
||||
## Cluster Sharding
|
||||
|
||||
During a rolling upgrade, sharded entities receiving traffic may be moved during @ref:[shard rebalancing](../cluster-sharding.md#shard-rebalancing),
|
||||
During a rolling upgrade, sharded entities receiving traffic may be moved during @ref:[shard rebalancing](../typed/cluster-sharding-concepts.md#shard-rebalancing),
|
||||
to an old or new node in the cluster, based on the pluggable allocation strategy and settings.
|
||||
When an old node is stopped the shards that were running on it are moved to one of the
|
||||
other old nodes remaining in the cluster. The `ShardCoordinator` is itself a cluster singleton.
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ up a large cluster into smaller groups of nodes for better scalability.
|
|||
## Membership
|
||||
|
||||
Some @ref[membership transitions](typed/cluster-membership.md#membership-lifecycle) are managed by
|
||||
one node called the @ref[leader](typed/cluster-specification.md#leader). There is one leader per data center
|
||||
one node called the @ref[leader](typed/cluster-concepts.md#leader). There is one leader per data center
|
||||
and it is responsible for these transitions for the members within the same data center. Members of
|
||||
other data centers are managed independently by the leader of the respective data center. These actions
|
||||
cannot be performed while there are any unreachability observations among the nodes in the data center,
|
||||
|
|
@ -105,7 +105,7 @@ Java
|
|||
|
||||
## Failure Detection
|
||||
|
||||
@ref[Failure detection](typed/cluster-specification.md#failure-detector) is performed by sending heartbeat messages
|
||||
@ref[Failure detection](typed/cluster-concepts.md#failure-detector) is performed by sending heartbeat messages
|
||||
to detect if a node is unreachable. This is done more frequently and with more certainty among
|
||||
the nodes in the same data center than across data centers. The failure detection across different data centers should
|
||||
be interpreted as an indication of problem with the network link between the data centers.
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
# Classic Cluster Sharding
|
||||
|
||||
@@include[includes.md](includes.md) { #actor-api }
|
||||
For the new API see @ref:[cluster-sharding](typed/cluster-sharding.md).
|
||||
For the new API see @ref:[Cluster Sharding](typed/cluster-sharding.md).
|
||||
|
||||
## Dependency
|
||||
|
||||
|
|
@ -22,38 +22,9 @@ to see what this looks like in practice.
|
|||
|
||||
## Introduction
|
||||
|
||||
Cluster sharding is useful when you need to distribute actors across several nodes in the cluster and want to
|
||||
be able to interact with them using their logical identifier, but without having to care about
|
||||
their physical location in the cluster, which might also change over time.
|
||||
For an introduction to Sharding concepts see @ref:[Cluster Sharding](typed/cluster-sharding.md).
|
||||
|
||||
It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology.
|
||||
Here we call these actors "entities". These actors typically have persistent (durable) state,
|
||||
but this feature is not limited to actors with persistent state.
|
||||
|
||||
Cluster sharding is typically used when you have many stateful actors that together consume
|
||||
more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors
|
||||
it might be easier to run them on a @ref:[Cluster Singleton](cluster-singleton.md) node.
|
||||
|
||||
In this context sharding means that actors with an identifier, so called entities,
|
||||
can be automatically distributed across multiple nodes in the cluster. Each entity
|
||||
actor runs only at one place, and messages can be sent to the entity without requiring
|
||||
the sender to know the location of the destination actor. This is achieved by sending
|
||||
the messages via a `ShardRegion` actor provided by this extension, which knows how
|
||||
to route the message with the entity id to the final destination.
|
||||
|
||||
Cluster sharding will not be active on members with status @ref:[WeaklyUp](typed/cluster-membership.md#weaklyup-members)
|
||||
if that feature is enabled.
|
||||
|
||||
@@@ warning
|
||||
|
||||
**Don't use Cluster Sharding together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple shards and entities* being started, one in each separate cluster!
|
||||
See @ref:[Downing](typed/cluster.md#automatic-vs-manual-downing).
|
||||
|
||||
@@@
|
||||
|
||||
## An Example
|
||||
## Basic example
|
||||
|
||||
This is how an entity actor may look like:
|
||||
|
||||
|
|
@ -140,205 +111,36 @@ A more comprehensive sample is available in the
|
|||
|
||||
## How it works
|
||||
|
||||
The `ShardRegion` actor is started on each node in the cluster, or group of nodes
|
||||
tagged with a specific role. The `ShardRegion` is created with two application specific
|
||||
functions to extract the entity identifier and the shard identifier from incoming messages.
|
||||
A `Shard` is a group of entities that will be managed together. For the first message in a
|
||||
specific shard the `ShardRegion` requests the location of the shard from a central coordinator,
|
||||
the `ShardCoordinator`.
|
||||
|
||||
The `ShardCoordinator` decides which `ShardRegion` shall own the `Shard` and informs
|
||||
that `ShardRegion`. The region will confirm this request and create the `Shard` supervisor
|
||||
as a child actor. The individual `Entities` will then be created when needed by the `Shard`
|
||||
actor. Incoming messages thus travel via the `ShardRegion` and the `Shard` to the target
|
||||
`Entity`.
|
||||
|
||||
If the shard home is another `ShardRegion` instance messages will be forwarded
|
||||
to that `ShardRegion` instance instead. While resolving the location of a
|
||||
shard incoming messages for that shard are buffered and later delivered when the
|
||||
shard home is known. Subsequent messages to the resolved shard can be delivered
|
||||
to the target destination immediately without involving the `ShardCoordinator`.
|
||||
|
||||
### Scenarios
|
||||
|
||||
Once a `Shard` location is known `ShardRegion`s send messages directly. Here are the
|
||||
scenarios for getting to this state. In the scenarios the following notation is used:
|
||||
|
||||
* `SC` - ShardCoordinator
|
||||
* `M#` - Message 1, 2, 3, etc
|
||||
* `SR#` - ShardRegion 1, 2 3, etc
|
||||
* `S#` - Shard 1 2 3, etc
|
||||
* `E#` - Entity 1 2 3, etc. An entity refers to an Actor managed by Cluster Sharding.
|
||||
|
||||
Where `#` is a number to distinguish between instances as there are multiple in the Cluster.
|
||||
|
||||
#### Scenario 1: Message to an unknown shard that belongs to the local ShardRegion
|
||||
|
||||
1. Incoming message `M1` to `ShardRegion` instance `SR1`.
|
||||
2. `M1` is mapped to shard `S1`. `SR1` doesn't know about `S1`, so it asks the `SC` for the location of `S1`.
|
||||
3. `SC` answers that the home of `S1` is `SR1`.
|
||||
4. `SR1` creates child actor shard `S1` and forwards the message to it.
|
||||
5. `S1` creates child actor for `E1` and forwards the message to it.
|
||||
6. All incoming messages for `S1` which arrive at `SR1` can be handled by `SR1` without `SC`.
|
||||
|
||||
#### Scenario 2: Message to an unknown shard that belongs to a remote ShardRegion
|
||||
|
||||
1. Incoming message `M2` to `ShardRegion` instance `SR1`.
|
||||
2. `M2` is mapped to `S2`. `SR1` doesn't know about `S2`, so it asks `SC` for the location of `S2`.
|
||||
3. `SC` answers that the home of `S2` is `SR2`.
|
||||
4. `SR1` sends buffered messages for `S2` to `SR2`.
|
||||
5. All incoming messages for `S2` which arrive at `SR1` can be handled by `SR1` without `SC`. It forwards messages to `SR2`.
|
||||
6. `SR2` receives message for `S2`, ask `SC`, which answers that the home of `S2` is `SR2`, and we are in Scenario 1 (but for `SR2`).
|
||||
|
||||
|
||||
### Shard location
|
||||
|
||||
To make sure that at most one instance of a specific entity actor is running somewhere
|
||||
in the cluster it is important that all nodes have the same view of where the shards
|
||||
are located. Therefore the shard allocation decisions are taken by the central
|
||||
`ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on
|
||||
the oldest member among all cluster nodes or a group of nodes tagged with a specific
|
||||
role.
|
||||
|
||||
The logic that decides where a shard is to be located is defined in a pluggable shard
|
||||
allocation strategy. The default implementation `ShardCoordinator.LeastShardAllocationStrategy`
|
||||
allocates new shards to the `ShardRegion` with least number of previously allocated shards.
|
||||
This strategy can be replaced by an application specific implementation.
|
||||
|
||||
### Shard Rebalancing
|
||||
|
||||
To be able to use newly added members in the cluster the coordinator facilitates rebalancing
|
||||
of shards, i.e. migrate entities from one node to another. In the rebalance process the
|
||||
coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started.
|
||||
That means they will start buffering incoming messages for that shard, in the same way as if the
|
||||
shard location is unknown. During the rebalance process the coordinator will not answer any
|
||||
requests for the location of shards that are being rebalanced, i.e. local buffering will
|
||||
continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard
|
||||
will stop all entities in that shard by sending the specified `stopMessage`
|
||||
(default `PoisonPill`) to them. When all entities have been terminated the `ShardRegion`
|
||||
owning the entities will acknowledge the handoff as completed to the coordinator.
|
||||
Thereafter the coordinator will reply to requests for the location of
|
||||
the shard, thereby allocating a new home for the shard, and then buffered messages in the
|
||||
`ShardRegion` actors are delivered to the new location. This means that the state of the entities
|
||||
are not transferred or migrated. If the state of the entities are of importance it should be
|
||||
persistent (durable), e.g. with @ref:[Persistence](persistence.md), so that it can be recovered at the new
|
||||
location.
|
||||
|
||||
The logic that decides which shards to rebalance is defined in a pluggable shard
|
||||
allocation strategy. The default implementation `ShardCoordinator.LeastShardAllocationStrategy`
|
||||
picks shards for handoff from the `ShardRegion` with most number of previously allocated shards.
|
||||
They will then be allocated to the `ShardRegion` with least number of previously allocated shards,
|
||||
i.e. new members in the cluster.
|
||||
|
||||
For the `LeastShardAllocationStrategy` there is a configurable threshold (`rebalance-threshold`) of
|
||||
how large the difference must be to begin the rebalancing. The difference between number of shards in
|
||||
the region with most shards and the region with least shards must be greater than the `rebalance-threshold`
|
||||
for the rebalance to occur.
|
||||
|
||||
A `rebalance-threshold` of 1 gives the best distribution and therefore typically the best choice.
|
||||
A higher threshold means that more shards can be rebalanced at the same time instead of one-by-one.
|
||||
That has the advantage that the rebalance process can be quicker but has the drawback that the
|
||||
the number of shards (and therefore load) between different nodes may be significantly different.
|
||||
|
||||
### ShardCoordinator State
|
||||
|
||||
The state of shard locations in the `ShardCoordinator` is persistent (durable) with
|
||||
@ref:[Distributed Data](distributed-data.md) to survive failures.
|
||||
|
||||
When a crashed or
|
||||
unreachable coordinator node has been removed (via down) from the cluster a new `ShardCoordinator` singleton
|
||||
actor will take over and the state is recovered. During such a failure period shards
|
||||
with a known location are still available, while messages for new (unknown) shards
|
||||
are buffered until the new `ShardCoordinator` becomes available.
|
||||
|
||||
### Message ordering
|
||||
|
||||
As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity
|
||||
actor the order of the messages is preserved. As long as the buffer limit is not reached
|
||||
messages are delivered on a best effort basis, with at-most once delivery semantics,
|
||||
in the same way as ordinary message sending. Reliable end-to-end messaging, with
|
||||
at-least-once semantics can be added by using `AtLeastOnceDelivery` in @ref:[Persistence](persistence.md).
|
||||
|
||||
### Overhead
|
||||
|
||||
Some additional latency is introduced for messages targeted to new or previously
|
||||
unused shards due to the round-trip to the coordinator. Rebalancing of shards may
|
||||
also add latency. This should be considered when designing the application specific
|
||||
shard resolution, e.g. to avoid too fine grained shards. Once a shard's location is known
|
||||
the only overhead is sending a message via the `ShardRegion` rather than directly.
|
||||
See @ref:[Cluster Sharding concepts](typed/cluster-sharding-concepts.md) in the documentation of the new APIs.
|
||||
|
||||
<a id="cluster-sharding-mode"></a>
|
||||
## Sharding State Store Mode
|
||||
|
||||
There are two cluster sharding states managed:
|
||||
1. @ref:[ShardCoordinator State](#shardcoordinator-state) - the `Shard` locations
|
||||
|
||||
1. @ref:[ShardCoordinator State](typed/cluster-sharding-concepts.md#shardcoordinator-state) - the `Shard` locations
|
||||
1. @ref:[Remembering Entities](#remembering-entities) - the entities in each `Shard`, which is optional, and disabled by default
|
||||
|
||||
For these, there are currently two modes which define how these states are stored:
|
||||
|
||||
* @ref:[Distributed Data Mode](#distributed-data-mode) - uses Akka @ref:[Distributed Data](distributed-data.md) (CRDTs) (the default)
|
||||
* @ref:[Persistence Mode](#persistence-mode) - (deprecated) uses Akka @ref:[Persistence](persistence.md) (Event Sourcing)
|
||||
|
||||
@@@ warning
|
||||
|
||||
Persistence for state store mode is deprecated.
|
||||
|
||||
@@@
|
||||
@@include[cluster.md](includes/cluster.md) { #sharding-persistence-mode-deprecated }
|
||||
|
||||
Changing the mode requires @ref:[a full cluster restart](additional/rolling-updates.md#cluster-sharding-configuration-change).
|
||||
|
||||
### Distributed Data Mode
|
||||
|
||||
This mode is enabled with configuration (enabled by default):
|
||||
|
||||
```
|
||||
akka.cluster.sharding.state-store-mode = ddata
|
||||
```
|
||||
|
||||
The state of the `ShardCoordinator` is replicated across the cluster but is not durable, not stored to disk.
|
||||
The `ShardCoordinator` state replication is handled by @ref:[Distributed Data](distributed-data.md) with `WriteMajority`/`ReadMajority` consistency.
|
||||
When all nodes in the cluster have been stopped, the state is no longer needed and dropped.
|
||||
|
||||
The state of @ref:[Remembering Entities](#remembering-entities) is durable and stored to
|
||||
disk. This means remembered entities are restarted even after a complete (non-rolling) cluster restart when the disk is still available.
|
||||
|
||||
Cluster Sharding uses its own Distributed Data `Replicator` per node.
|
||||
If using roles with sharding there is one `Replicator` per role, which enables a subset of
|
||||
all nodes for some entity types and another subset for other entity types. Each such replicator has a name
|
||||
that contains the node role and therefore the role configuration must be the same on all nodes in the
|
||||
cluster, for example you can't change the roles when performing a rolling upgrade.
|
||||
Changing roles requires @ref:[a full cluster restart](additional/rolling-updates.md#cluster-sharding-configuration-change).
|
||||
|
||||
The settings for Distributed Data are configured in the section
|
||||
`akka.cluster.sharding.distributed-data`. It's not possible to have different
|
||||
`distributed-data` settings for different sharding entity types.
|
||||
See @ref:[Distributed Data mode](typed/cluster-sharding.md#distributed-data-mode) in the documentation of the new APIs.
|
||||
|
||||
### Persistence Mode
|
||||
|
||||
This mode is enabled with configuration:
|
||||
|
||||
```
|
||||
akka.cluster.sharding.state-store-mode = persistence
|
||||
```
|
||||
|
||||
Since it is running in a cluster @ref:[Persistence](persistence.md) must be configured with a distributed journal.
|
||||
|
||||
@@@ note
|
||||
|
||||
Persistence mode for @ref:[Remembering Entities](#remembering-entities) will be replaced by a pluggable data access API with storage implementations.
|
||||
New sharding applications should no longer choose persistence mode. Existing users of persistence mode
|
||||
[can eventually migrate to the replacement options](https://github.com/akka/akka/issues/26177).
|
||||
|
||||
@@@
|
||||
|
||||
## Startup after minimum number of members
|
||||
|
||||
It's good to use Cluster Sharding with the Cluster setting `akka.cluster.min-nr-of-members` or
|
||||
`akka.cluster.role.<role-name>.min-nr-of-members`. That will defer the allocation of the shards
|
||||
until at least that number of regions have been started and registered to the coordinator. This
|
||||
avoids that many shards are allocated to the first region that registers and only later are
|
||||
rebalanced to other nodes.
|
||||
|
||||
See @ref:[How To Startup when Cluster Size Reached](cluster-usage.md#min-members) for more information about `min-nr-of-members`.
|
||||
See @ref:[Persistence Mode](typed/cluster-sharding.md#persistence-mode) in the documentation of the new APIs.
|
||||
|
||||
## Proxy Only Mode
|
||||
|
||||
|
|
@ -362,30 +164,20 @@ then supposed to stop itself. Incoming messages will be buffered by the `Shard`
|
|||
between reception of `Passivate` and termination of the entity. Such buffered messages
|
||||
are thereafter delivered to a new incarnation of the entity.
|
||||
|
||||
### Automatic Passivation
|
||||
|
||||
The entities are automatically passivated if they haven't received a message within the duration configured in
|
||||
`akka.cluster.sharding.passivate-idle-entity-after`
|
||||
or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable
|
||||
time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages
|
||||
to the `ActorRef` or messages that the actor sends to itself are not counted in this activity.
|
||||
Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`.
|
||||
It is always disabled if @ref:[Remembering Entities](#remembering-entities) is enabled.
|
||||
See @ref:[Automatic Passivation](typed/cluster-sharding.md#automatic-passivation) in the documentation of the new APIs.
|
||||
|
||||
<a id="cluster-sharding-remembering"></a>
|
||||
## Remembering Entities
|
||||
|
||||
Remembering entities pertains to restarting entities after a rebalance or recovering from a crash.
|
||||
Enabling or disabling (the default) this feature drives the behavior of the restarts:
|
||||
* enabled: entities are restarted, even though no new messages are sent to them
|
||||
* disabled: entities are restarted, on demand when a new message arrives
|
||||
See @ref:[Remembering Entities](typed/cluster-sharding.md#remembering-entities) in the documentation of the new APIs,
|
||||
including behavior when enabled and disabled.
|
||||
|
||||
Note that the state of the entities themselves will not be restored unless they have been made persistent,
|
||||
for example with @ref:[Event Sourcing](persistence.md).
|
||||
|
||||
To make the list of entities in each `Shard` persistent (durable), set
|
||||
the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
|
||||
`ClusterSharding.start` and making sure the `shardIdExtractor` handles
|
||||
`ClusterSharding.start` and make sure the `shardIdExtractor` handles
|
||||
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
|
||||
extract from the `EntityId`.
|
||||
|
||||
|
|
@ -395,41 +187,6 @@ Scala
|
|||
Java
|
||||
: @@snip [ClusterShardingTest.java](/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity }
|
||||
|
||||
This can also be configured by setting `akka.cluster.sharding.remember-entities = on`.
|
||||
|
||||
The performance cost of `rememberEntities` is rather high when starting/stopping entities and when
|
||||
shards are rebalanced. This cost increases with number of entities per shard, thus it is not
|
||||
recommended with more than 10000 active (non passivated) entities per shard.
|
||||
|
||||
### Behavior When Enabled
|
||||
|
||||
When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto another
|
||||
node or recovers after a crash it will recreate all the entities which were previously
|
||||
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
|
||||
sent to the parent of the entity actor, otherwise the entity will be automatically
|
||||
restarted after the entity restart backoff specified in the configuration.
|
||||
|
||||
When [Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are
|
||||
stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
|
||||
configuration of the `akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since
|
||||
the default directory contains the remote port of the actor system. If using a dynamically
|
||||
assigned port (0) it will be different each time and the previously stored data will not
|
||||
be loaded.
|
||||
|
||||
The reason for storing the identifiers of the active entities in durable storage, i.e. stored to
|
||||
disk, is that the same entities should be started also after a complete cluster restart. If this is not needed
|
||||
you can disable durable storage and benefit from better performance by using the following configuration:
|
||||
|
||||
```
|
||||
akka.cluster.sharding.distributed-data.durable.keys = []
|
||||
```
|
||||
|
||||
### Behavior When Not Enabled
|
||||
|
||||
When `rememberEntities` is disabled (the default), a `Shard` will not automatically restart any entities
|
||||
after a rebalance or recovering from a crash. Instead, entities are started once the first message
|
||||
for that entity has been received in the `Shard`.
|
||||
|
||||
## Supervision
|
||||
|
||||
If you need to use another `supervisorStrategy` for the entity actors than the default (restarting) strategy
|
||||
|
|
@ -468,67 +225,15 @@ graceful leaving process of a cluster member.
|
|||
<a id="removeinternalclustershardingdata"></a>
|
||||
## Removal of Internal Cluster Sharding Data
|
||||
|
||||
The Cluster Sharding `ShardCoordinator` stores locations of the shards.
|
||||
This data is safely be removed when restarting the whole Akka Cluster.
|
||||
Note that this does not include application data.
|
||||
|
||||
There is a utility program `akka.cluster.sharding.RemoveInternalClusterShardingData`
|
||||
that removes this data.
|
||||
|
||||
@@@ warning
|
||||
|
||||
Never use this program while there are running Akka Cluster nodes that are
|
||||
using Cluster Sharding. Stop all Cluster nodes before using this program.
|
||||
|
||||
@@@
|
||||
|
||||
It can be needed to remove the data if the Cluster Sharding coordinator
|
||||
cannot startup because of corrupt data, which may happen if accidentally
|
||||
two clusters were running at the same time, e.g. caused by using auto-down
|
||||
and there was a network partition.
|
||||
|
||||
@@@ warning
|
||||
|
||||
**Don't use Cluster Sharding together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple shards and entities* being started, one in each separate cluster!
|
||||
See @ref:[Downing](typed/cluster.md#automatic-vs-manual-downing).
|
||||
|
||||
@@@
|
||||
|
||||
Use this program as a standalone Java main program:
|
||||
|
||||
```
|
||||
java -classpath <jar files, including akka-cluster-sharding>
|
||||
akka.cluster.sharding.RemoveInternalClusterShardingData
|
||||
-2.3 entityType1 entityType2 entityType3
|
||||
```
|
||||
|
||||
The program is included in the `akka-cluster-sharding` jar file. It
|
||||
is easiest to run it with same classpath and configuration as your ordinary
|
||||
application. It can be run from sbt or Maven in similar way.
|
||||
|
||||
Specify the entity type names (same as you use in the `start` method
|
||||
of `ClusterSharding`) as program arguments.
|
||||
|
||||
If you specify `-2.3` as the first program argument it will also try
|
||||
to remove data that was stored by Cluster Sharding in Akka 2.3.x using
|
||||
different persistenceId.
|
||||
See @ref:[removal of Internal Cluster Sharding Data](typed/cluster-sharding.md#removal-of-internal-cluster-sharding-data) in the documentation of the new APIs.
|
||||
|
||||
## Configuration
|
||||
|
||||
The `ClusterSharding` extension can be configured with the following properties. These configuration
|
||||
properties are read by the `ClusterShardingSettings` when created with a `ActorSystem` parameter.
|
||||
It is also possible to amend the `ClusterShardingSettings` or create it from another config section
|
||||
with the same layout as below. `ClusterShardingSettings` is a parameter to the `start` method of
|
||||
`ClusterShardingSettings` is a parameter to the `start` method of
|
||||
the `ClusterSharding` extension, i.e. each each entity type can be configured with different settings
|
||||
if needed.
|
||||
|
||||
@@snip [reference.conf](/akka-cluster-sharding/src/main/resources/reference.conf) { #sharding-ext-config }
|
||||
|
||||
Custom shard allocation strategy can be defined in an optional parameter to
|
||||
`ClusterSharding.start`. See the API documentation of @scala[`ShardAllocationStrategy`] @java[`AbstractShardAllocationStrategy`] for details
|
||||
of how to implement a custom shard allocation strategy.
|
||||
See @ref:[configuration](typed/cluster-sharding.md#configuration) for more information.
|
||||
|
||||
## Inspecting cluster sharding state
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
This document describes how to use Akka Cluster and the Cluster APIs using code samples.
|
||||
For specific documentation topics see:
|
||||
|
||||
* @ref:[Cluster Specification](typed/cluster-specification.md)
|
||||
* @ref:[Cluster Specification](typed/cluster-concepts.md)
|
||||
* @ref:[Cluster Membership Service](typed/cluster-membership.md)
|
||||
* @ref:[When and where to use Akka Cluster](typed/choosing-cluster.md)
|
||||
* @ref:[Higher level Cluster tools](#higher-level-cluster-tools)
|
||||
|
|
@ -39,7 +39,7 @@ It contains instructions on how to run the `SimpleClusterApp`.
|
|||
|
||||
## When and where to use Akka Cluster
|
||||
|
||||
See [Choosing Akka Cluster](typed/choosing-cluster.md#when-and-where-to-use-akka-cluster).
|
||||
See [Choosing Akka Cluster](typed/choosing-cluster.md#when-and-where-to-use-akka-cluster) in the documentation of the new APIs.
|
||||
|
||||
## Cluster API Extension
|
||||
|
||||
|
|
@ -204,11 +204,15 @@ It contains instructions on how to run the **Worker Dial-in Example** sample.
|
|||
|
||||
## Node Roles
|
||||
|
||||
See @ref:[Cluster Node Roles](typed/cluster.md#node-roles)
|
||||
See @ref:[Cluster Node Roles](typed/cluster.md#node-roles) in the documentation of the new APIs.
|
||||
|
||||
<a id="min-members"></a>
|
||||
## How To Startup when Cluster Size Reached
|
||||
|
||||
See @ref:[How to startup when a minimum number of members in the cluster is reached](typed/cluster.md#how-to-startup-when-a-cluster-size-is-reached) in the documentation of the new APIs.
|
||||
|
||||
## How To Startup when Member is Up
|
||||
|
||||
A common use case is to start actors after the cluster has been initialized,
|
||||
members have joined, and the cluster has reached a certain size.
|
||||
|
||||
|
|
@ -229,8 +233,6 @@ akka.cluster.role {
|
|||
}
|
||||
```
|
||||
|
||||
## How To Startup when Member is Up
|
||||
|
||||
You can start actors or trigger any functions using the `registerOnMemberUp` callback, which will
|
||||
be invoked when the current member status is changed to 'Up'. This can additionally be used with
|
||||
`akka.cluster.min-nr-of-members` optional configuration to defer an action until the cluster has reached a certain size.
|
||||
|
|
@ -304,7 +306,7 @@ See @ref:[Cluster Metrics](cluster-metrics.md).
|
|||
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is
|
||||
unreachable from the rest of the cluster. Please see:
|
||||
|
||||
* @ref:[Failure Detector specification](typed/cluster-specification.md#failure-detector)
|
||||
* @ref:[Failure Detector specification](typed/cluster-concepts.md#failure-detector)
|
||||
* @ref:[Phi Accrual Failure Detector](typed/failure-detector.md) implementation
|
||||
* [Using the Failure Detector](typed/cluster.md#using-the-failure-detector)
|
||||
|
||||
|
|
|
|||
|
|
@ -658,88 +658,7 @@ Java
|
|||
<a id="ddata-durable"></a>
|
||||
### Durable Storage
|
||||
|
||||
By default the data is only kept in memory. It is redundant since it is replicated to other nodes
|
||||
in the cluster, but if you stop all nodes the data is lost, unless you have saved it
|
||||
elsewhere.
|
||||
|
||||
Entries can be configured to be durable, i.e. stored on local disk on each node. The stored data will be loaded
|
||||
next time the replicator is started, i.e. when actor system is restarted. This means data will survive as
|
||||
long as at least one node from the old cluster takes part in a new cluster. The keys of the durable entries
|
||||
are configured with:
|
||||
|
||||
```
|
||||
akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"]
|
||||
```
|
||||
|
||||
Prefix matching is supported by using `*` at the end of a key.
|
||||
|
||||
All entries can be made durable by specifying:
|
||||
|
||||
```
|
||||
akka.cluster.distributed-data.durable.keys = ["*"]
|
||||
```
|
||||
|
||||
@scala[[LMDB](https://symas.com/products/lightning-memory-mapped-database/)]@java[[LMDB](https://github.com/lmdbjava/lmdbjava/)] is the default storage implementation. It is
|
||||
possible to replace that with another implementation by implementing the actor protocol described in
|
||||
`akka.cluster.ddata.DurableStore` and defining the `akka.cluster.distributed-data.durable.store-actor-class`
|
||||
property for the new implementation.
|
||||
|
||||
The location of the files for the data is configured with:
|
||||
|
||||
Scala
|
||||
: ```
|
||||
# Directory of LMDB file. There are two options:
|
||||
# 1. A relative or absolute path to a directory that ends with 'ddata'
|
||||
# the full name of the directory will contain name of the ActorSystem
|
||||
# and its remote port.
|
||||
# 2. Otherwise the path is used as is, as a relative or absolute path to
|
||||
# a directory.
|
||||
akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
|
||||
```
|
||||
|
||||
Java
|
||||
: ```
|
||||
# Directory of LMDB file. There are two options:
|
||||
# 1. A relative or absolute path to a directory that ends with 'ddata'
|
||||
# the full name of the directory will contain name of the ActorSystem
|
||||
# and its remote port.
|
||||
# 2. Otherwise the path is used as is, as a relative or absolute path to
|
||||
# a directory.
|
||||
akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
|
||||
```
|
||||
|
||||
|
||||
When running in production you may want to configure the directory to a specific
|
||||
path (alt 2), since the default directory contains the remote port of the
|
||||
actor system to make the name unique. If using a dynamically assigned
|
||||
port (0) it will be different each time and the previously stored data
|
||||
will not be loaded.
|
||||
|
||||
Making the data durable has a performance cost. By default, each update is flushed
|
||||
to disk before the `UpdateSuccess` reply is sent. For better performance, but with the risk of losing
|
||||
the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during
|
||||
a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially
|
||||
efficient when performing many writes to the same key, because it is only the last value for each key
|
||||
that will be serialized and stored. The risk of losing writes if the JVM crashes is small since the
|
||||
data is typically replicated to other nodes immediately according to the given `WriteConsistency`.
|
||||
|
||||
```
|
||||
akka.cluster.distributed-data.durable.lmdb.write-behind-interval = 200 ms
|
||||
```
|
||||
|
||||
Note that you should be prepared to receive `WriteFailure` as reply to an `Update` of a
|
||||
durable entry if the data could not be stored for some reason. When enabling `write-behind-interval`
|
||||
such errors will only be logged and `UpdateSuccess` will still be the reply to the `Update`.
|
||||
|
||||
There is one important caveat when it comes pruning of [CRDT Garbage](#crdt-garbage) for durable data.
|
||||
If an old data entry that was never pruned is injected and merged with existing data after
|
||||
that the pruning markers have been removed the value will not be correct. The time-to-live
|
||||
of the markers is defined by configuration
|
||||
`akka.cluster.distributed-data.durable.remove-pruning-marker-after` and is in the magnitude of days.
|
||||
This would be possible if a node with durable data didn't participate in the pruning
|
||||
(e.g. it was shutdown) and later started after this time. A node with durable data should not
|
||||
be stopped for longer time than this duration and if it is joining again after this
|
||||
duration its data should first be manually removed (from the lmdb directory).
|
||||
See @ref:[Durable Storage](typed/distributed-data.md#durable-storage) in the documentation of the new APIs.
|
||||
|
||||
### CRDT Garbage
|
||||
|
||||
|
|
|
|||
|
|
@ -48,3 +48,11 @@ at startup by using some external tool or API. When joining to seed nodes you sh
|
|||
the node itself except for the node that is supposed to be the first seed node, which should be
|
||||
placed first in the parameter to the programmatic join.
|
||||
<!--- #join-seeds-programmatic --->
|
||||
|
||||
<!--- #sharding-persistence-mode-deprecated --->
|
||||
@@@ warning
|
||||
|
||||
Persistence for state store mode is deprecated.
|
||||
|
||||
@@@
|
||||
<!--- #sharding-persistence-mode-deprecated --->
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ implement manually.
|
|||
@@@
|
||||
|
||||
@@@ warning { title=IMPORTANT }
|
||||
Use stream refs with Akka Cluster. The [failure detector can cause quarantining](../typed/cluster-specification.md#quarantined) if plain Akka remoting is used.
|
||||
Use stream refs with Akka Cluster. The [failure detector can cause quarantining](../typed/cluster-concepts.md#quarantined) if plain Akka remoting is used.
|
||||
@@@
|
||||
|
||||
## Stream References
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
# Cluster Membership Service
|
||||
|
||||
The core of Akka Cluster is the cluster membership, to keep track of what nodes are part of the cluster and
|
||||
their health. Cluster membership is communicated using @ref:[gossip](cluster-specification.md#gossip) and
|
||||
@ref:[failure detection](cluster-specification.md#failure-detector).
|
||||
their health. Cluster membership is communicated using @ref:[gossip](cluster-concepts.md#gossip) and
|
||||
@ref:[failure detection](cluster-concepts.md#failure-detector).
|
||||
|
||||
There are several @ref:[Higher level Cluster tools](../typed/cluster.md#higher-level-cluster-tools) that are built
|
||||
on top of the cluster membership service.
|
||||
|
|
|
|||
130
akka-docs/src/main/paradox/typed/cluster-sharding-concepts.md
Normal file
130
akka-docs/src/main/paradox/typed/cluster-sharding-concepts.md
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
# Cluster Sharding concepts
|
||||
|
||||
The `ShardRegion` actor is started on each node in the cluster, or group of nodes
|
||||
tagged with a specific role. The `ShardRegion` is created with two application specific
|
||||
functions to extract the entity identifier and the shard identifier from incoming messages.
|
||||
A `Shard` is a group of entities that will be managed together. For the first message in a
|
||||
specific shard the `ShardRegion` requests the location of the shard from a central coordinator,
|
||||
the `ShardCoordinator`.
|
||||
|
||||
The `ShardCoordinator` decides which `ShardRegion` shall own the `Shard` and informs
|
||||
that `ShardRegion`. The region will confirm this request and create the `Shard` supervisor
|
||||
as a child actor. The individual `Entities` will then be created when needed by the `Shard`
|
||||
actor. Incoming messages thus travel via the `ShardRegion` and the `Shard` to the target
|
||||
`Entity`.
|
||||
|
||||
If the shard home is another `ShardRegion` instance messages will be forwarded
|
||||
to that `ShardRegion` instance instead. While resolving the location of a
|
||||
shard incoming messages for that shard are buffered and later delivered when the
|
||||
shard home is known. Subsequent messages to the resolved shard can be delivered
|
||||
to the target destination immediately without involving the `ShardCoordinator`.
|
||||
|
||||
### Scenarios
|
||||
|
||||
Once a `Shard` location is known `ShardRegion`s send messages directly. Here are the
|
||||
scenarios for getting to this state. In the scenarios the following notation is used:
|
||||
|
||||
* `SC` - ShardCoordinator
|
||||
* `M#` - Message 1, 2, 3, etc
|
||||
* `SR#` - ShardRegion 1, 2 3, etc
|
||||
* `S#` - Shard 1 2 3, etc
|
||||
* `E#` - Entity 1 2 3, etc. An entity refers to an Actor managed by Cluster Sharding.
|
||||
|
||||
Where `#` is a number to distinguish between instances as there are multiple in the Cluster.
|
||||
|
||||
#### Scenario 1: Message to an unknown shard that belongs to the local ShardRegion
|
||||
|
||||
1. Incoming message `M1` to `ShardRegion` instance `SR1`.
|
||||
2. `M1` is mapped to shard `S1`. `SR1` doesn't know about `S1`, so it asks the `SC` for the location of `S1`.
|
||||
3. `SC` answers that the home of `S1` is `SR1`.
|
||||
4. `SR1` creates child actor shard `S1` and forwards the message to it.
|
||||
5. `S1` creates child actor for `E1` and forwards the message to it.
|
||||
6. All incoming messages for `S1` which arrive at `SR1` can be handled by `SR1` without `SC`.
|
||||
|
||||
#### Scenario 2: Message to an unknown shard that belongs to a remote ShardRegion
|
||||
|
||||
1. Incoming message `M2` to `ShardRegion` instance `SR1`.
|
||||
2. `M2` is mapped to `S2`. `SR1` doesn't know about `S2`, so it asks `SC` for the location of `S2`.
|
||||
3. `SC` answers that the home of `S2` is `SR2`.
|
||||
4. `SR1` sends buffered messages for `S2` to `SR2`.
|
||||
5. All incoming messages for `S2` which arrive at `SR1` can be handled by `SR1` without `SC`. It forwards messages to `SR2`.
|
||||
6. `SR2` receives message for `S2`, ask `SC`, which answers that the home of `S2` is `SR2`, and we are in Scenario 1 (but for `SR2`).
|
||||
|
||||
### Shard location
|
||||
|
||||
To make sure that at most one instance of a specific entity actor is running somewhere
|
||||
in the cluster it is important that all nodes have the same view of where the shards
|
||||
are located. Therefore the shard allocation decisions are taken by the central
|
||||
`ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on
|
||||
the oldest member among all cluster nodes or a group of nodes tagged with a specific
|
||||
role.
|
||||
|
||||
The logic that decides where a shard is to be located is defined in a
|
||||
pluggable @ref:[shard allocation strategy](cluster-sharding.md#shard-allocation-strategy).
|
||||
|
||||
### Shard rebalancing
|
||||
|
||||
To be able to use newly added members in the cluster the coordinator facilitates rebalancing
|
||||
of shards, i.e. migrate entities from one node to another. In the rebalance process the
|
||||
coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started.
|
||||
That means they will start buffering incoming messages for that shard, in the same way as if the
|
||||
shard location is unknown. During the rebalance process the coordinator will not answer any
|
||||
requests for the location of shards that are being rebalanced, i.e. local buffering will
|
||||
continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard
|
||||
will stop all entities in that shard by sending the specified `stopMessage`
|
||||
(default `PoisonPill`) to them. When all entities have been terminated the `ShardRegion`
|
||||
owning the entities will acknowledge the handoff as completed to the coordinator.
|
||||
Thereafter the coordinator will reply to requests for the location of
|
||||
the shard, thereby allocating a new home for the shard, and then buffered messages in the
|
||||
`ShardRegion` actors are delivered to the new location. This means that the state of the entities
|
||||
are not transferred or migrated. If the state of the entities are of importance it should be
|
||||
persistent (durable), e.g. with @ref:[Persistence](persistence.md) (or see @ref:[Classic Persistence](../persistence.md)), so that it can be recovered at the new
|
||||
location.
|
||||
|
||||
The logic that decides which shards to rebalance is defined in a pluggable shard
|
||||
allocation strategy. The default implementation `ShardCoordinator.LeastShardAllocationStrategy`
|
||||
picks shards for handoff from the `ShardRegion` with most number of previously allocated shards.
|
||||
They will then be allocated to the `ShardRegion` with least number of previously allocated shards,
|
||||
i.e. new members in the cluster.
|
||||
|
||||
For the `LeastShardAllocationStrategy` there is a configurable threshold (`rebalance-threshold`) of
|
||||
how large the difference must be to begin the rebalancing. The difference between number of shards in
|
||||
the region with most shards and the region with least shards must be greater than the `rebalance-threshold`
|
||||
for the rebalance to occur.
|
||||
|
||||
A `rebalance-threshold` of 1 gives the best distribution and therefore typically the best choice.
|
||||
A higher threshold means that more shards can be rebalanced at the same time instead of one-by-one.
|
||||
That has the advantage that the rebalance process can be quicker but has the drawback that the
|
||||
the number of shards (and therefore load) between different nodes may be significantly different.
|
||||
|
||||
### ShardCoordinator state
|
||||
|
||||
The state of shard locations in the `ShardCoordinator` is persistent (durable) with
|
||||
@ref:[Distributed Data](distributed-data.md) (or see @ref:[Classic Distributed Data](../distributed-data.md))to survive failures.
|
||||
|
||||
When a crashed or
|
||||
unreachable coordinator node has been removed (via down) from the cluster a new `ShardCoordinator` singleton
|
||||
actor will take over and the state is recovered. During such a failure period shards
|
||||
with a known location are still available, while messages for new (unknown) shards
|
||||
are buffered until the new `ShardCoordinator` becomes available.
|
||||
|
||||
### Message ordering
|
||||
|
||||
As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity
|
||||
actor the order of the messages is preserved. As long as the buffer limit is not reached
|
||||
messages are delivered on a best effort basis, with at-most once delivery semantics,
|
||||
in the same way as ordinary message sending.
|
||||
|
||||
#### AtLeastOnceDelivery
|
||||
|
||||
Reliable end-to-end messaging, with at-least-once semantics can be added by using
|
||||
`AtLeastOnceDelivery` with @ref:[Classic Persistence](../persistence.md#at-least-once-delivery),
|
||||
and see @github[#20984](#20984) AtLeastOnceDelivery, including redelivery with a backoff.
|
||||
|
||||
### Overhead
|
||||
|
||||
Some additional latency is introduced for messages targeted to new or previously
|
||||
unused shards due to the round-trip to the coordinator. Rebalancing of shards may
|
||||
also add latency. This should be considered when designing the application specific
|
||||
shard resolution, e.g. to avoid too fine grained shards. Once a shard's location is known
|
||||
the only overhead is sending a message via the `ShardRegion` rather than directly.
|
||||
|
|
@ -1,8 +1,14 @@
|
|||
# Cluster Sharding
|
||||
|
||||
@@@ note
|
||||
|
||||
For the Akka Classic API see @ref:[Classic Cluster Sharding](../cluster-sharding.md)
|
||||
|
||||
@@@
|
||||
|
||||
## Dependency
|
||||
|
||||
To use Akka Cluster Sharding Typed, you must add the following dependency in your project:
|
||||
To use Akka Cluster Sharding, you must add the following dependency in your project:
|
||||
|
||||
@@dependency[sbt,Maven,Gradle] {
|
||||
group=com.typesafe.akka
|
||||
|
|
@ -12,8 +18,36 @@ To use Akka Cluster Sharding Typed, you must add the following dependency in you
|
|||
|
||||
## Introduction
|
||||
|
||||
For an introduction to Sharding concepts see @ref:[Cluster Sharding](../cluster-sharding.md). This documentation shows how to use the typed
|
||||
Cluster Sharding API.
|
||||
Cluster sharding is useful when you need to distribute actors across several nodes in the cluster and want to
|
||||
be able to interact with them using their logical identifier, but without having to care about
|
||||
their physical location in the cluster, which might also change over time.
|
||||
|
||||
It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology.
|
||||
Here we call these actors "entities". These actors typically have persistent (durable) state,
|
||||
but this feature is not limited to actors with persistent state.
|
||||
|
||||
Cluster sharding is typically used when you have many stateful actors that together consume
|
||||
more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors
|
||||
it might be easier to run them on a @ref:[Cluster Singleton](cluster-singleton.md) node.
|
||||
|
||||
In this context sharding means that actors with an identifier, so called entities,
|
||||
can be automatically distributed across multiple nodes in the cluster. Each entity
|
||||
actor runs only at one place, and messages can be sent to the entity without requiring
|
||||
the sender to know the location of the destination actor. This is achieved by sending
|
||||
the messages via a `ShardRegion` actor provided by this extension, which knows how
|
||||
to route the message with the entity id to the final destination.
|
||||
|
||||
Cluster sharding will not be active on members with status @ref:[WeaklyUp](cluster-membership.md#weaklyup-members)
|
||||
if that feature is enabled.
|
||||
|
||||
@@@ warning
|
||||
|
||||
**Don't use Cluster Sharding together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple shards and entities* being started, one in each separate cluster!
|
||||
See @ref:[Downing](cluster.md#automatic-vs-manual-downing).
|
||||
|
||||
@@@
|
||||
|
||||
## Basic example
|
||||
|
||||
|
|
@ -87,6 +121,82 @@ is used but `tell` or any of the other @ref:[Interaction Patterns](interaction-p
|
|||
|
||||
See @ref:[persistence](persistence.md) for more details.
|
||||
|
||||
## Shard allocation strategy
|
||||
|
||||
The default implementation `akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy`
|
||||
allocates new shards to the `ShardRegion` with least number of previously allocated shards.
|
||||
This strategy can be replaced by an application specific implementation.
|
||||
|
||||
An optional custom shard allocation strategy can be passed into the optional parameter when initializing an entity type
|
||||
or explicitly using the `withAllocationStrategy` function.
|
||||
See the API documentation of @scala[`akka.cluster.sharding.ShardAllocationStrategy`]@java[`akka.cluster.sharding.AbstractShardAllocationStrategy`] for details
|
||||
of how to implement a custom `ShardAllocationStrategy`.
|
||||
|
||||
## How it works
|
||||
|
||||
See @ref:[Cluster Sharding concepts](cluster-sharding-concepts.md).
|
||||
|
||||
## Sharding State Store Mode
|
||||
|
||||
There are two cluster sharding states managed:
|
||||
|
||||
1. @ref:[ShardCoordinator State](cluster-sharding-concepts.md#shardcoordinator-state) - the `Shard` locations
|
||||
1. @ref:[Remembering Entities](#remembering-entities) - the entities in each `Shard`, which is optional, and disabled by default
|
||||
|
||||
For these, there are currently two modes which define how these states are stored:
|
||||
|
||||
* @ref:[Distributed Data Mode](#distributed-data-mode) - uses Akka @ref:[Distributed Data](distributed-data.md) (CRDTs) (the default)
|
||||
* @ref:[Persistence Mode](#persistence-mode) - (deprecated) uses Akka @ref:[Persistence](persistence.md) (Event Sourcing)
|
||||
|
||||
@@include[cluster.md](../includes/cluster.md) { #sharding-persistence-mode-deprecated }
|
||||
|
||||
Changing the mode requires @ref:[a full cluster restart](../additional/rolling-updates.md#cluster-sharding-configuration-change).
|
||||
|
||||
### Distributed Data Mode
|
||||
|
||||
This mode is enabled with configuration (enabled by default):
|
||||
|
||||
```
|
||||
akka.cluster.sharding.state-store-mode = ddata
|
||||
```
|
||||
|
||||
The state of the `ShardCoordinator` is replicated across the cluster but is not durable, not stored to disk.
|
||||
The `ShardCoordinator` state replication is handled by @ref:[Distributed Data](distributed-data.md) with `WriteMajority`/`ReadMajority` consistency.
|
||||
When all nodes in the cluster have been stopped, the state is no longer needed and dropped.
|
||||
|
||||
The state of @ref:[Remembering Entities](#remembering-entities) is durable and stored to
|
||||
disk. This means remembered entities are restarted even after a complete (non-rolling) cluster restart when the disk is still available.
|
||||
|
||||
Cluster Sharding uses its own Distributed Data `Replicator` per node.
|
||||
If using roles with sharding there is one `Replicator` per role, which enables a subset of
|
||||
all nodes for some entity types and another subset for other entity types. Each such replicator has a name
|
||||
that contains the node role and therefore the role configuration must be the same on all nodes in the
|
||||
cluster, for example you can't change the roles when performing a rolling upgrade.
|
||||
Changing roles requires @ref:[a full cluster restart](../additional/rolling-updates.md#cluster-sharding-configuration-change).
|
||||
|
||||
The settings for Distributed Data are configured in the section
|
||||
`akka.cluster.sharding.distributed-data`. It's not possible to have different
|
||||
`distributed-data` settings for different sharding entity types.
|
||||
|
||||
### Persistence Mode
|
||||
|
||||
This mode is enabled with configuration:
|
||||
|
||||
```
|
||||
akka.cluster.sharding.state-store-mode = persistence
|
||||
```
|
||||
|
||||
Since it is running in a cluster @ref:[Persistence](persistence.md) must be configured with a distributed journal.
|
||||
|
||||
@@@ note
|
||||
|
||||
Persistence mode for @ref:[Remembering Entities](#remembering-entities) will be replaced by a pluggable data access API with storage implementations,
|
||||
see @github[#27763](#27763).
|
||||
New sharding applications should no longer choose persistence mode. Existing users of persistence mode
|
||||
[can eventually migrate to the replacement options](https://github.com/akka/akka/issues/26177).
|
||||
|
||||
@@@
|
||||
|
||||
## Passivation
|
||||
|
||||
If the state of the entities are persistent you may stop entities that are not used to
|
||||
|
|
@ -94,7 +204,7 @@ reduce memory consumption. This is done by the application specific implementati
|
|||
the entity actors for example by defining receive timeout (`context.setReceiveTimeout`).
|
||||
If a message is already enqueued to the entity when it stops itself the enqueued message
|
||||
in the mailbox will be dropped. To support graceful passivation without losing such
|
||||
messages the entity actor can send `ClusterSharding.Passivate` to to the
|
||||
messages the entity actor can send `ClusterSharding.Passivate` to the
|
||||
@scala[`ActorRef[ShardCommand]`]@java[`ActorRef<ShardCommand>`] that was passed in to
|
||||
the factory method when creating the entity. The optional `stopMessage` message
|
||||
will be sent back to the entity, which is then supposed to stop itself, otherwise it will
|
||||
|
|
@ -125,10 +235,129 @@ message if the entity needs to perform some asynchronous cleanup or interactions
|
|||
|
||||
The entities are automatically passivated if they haven't received a message within the duration configured in
|
||||
`akka.cluster.sharding.passivate-idle-entity-after`
|
||||
or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable
|
||||
or by explicitly setting the `passivateIdleEntityAfter` flag on `ClusterShardingSettings` to a suitable
|
||||
time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages
|
||||
to the `ActorRef` or messages that the actor sends to itself are not counted in this activity.
|
||||
Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`.
|
||||
It is always disabled if @ref:[Remembering Entities](../cluster-sharding.md#remembering-entities) is enabled.
|
||||
It is disabled automatically if @ref:[Remembering Entities](#remembering-entities) is enabled.
|
||||
|
||||
## Remembering Entities
|
||||
|
||||
Remembering entities pertains to restarting entities after a rebalance or recovering from a crash.
|
||||
Enabling or disabling (the default) this feature drives the behavior of the restarts:
|
||||
|
||||
* enabled: entities are restarted, even though no new messages are sent to them. This will also automatically disable @ref:[Passivation](#passivation).
|
||||
* disabled: entities are restarted, on demand when a new message arrives.
|
||||
|
||||
Note that the state of the entities themselves will not be restored unless they have been made persistent,
|
||||
for example with @ref:[Event Sourcing](persistence.md).
|
||||
|
||||
To make the list of entities in each `Shard` persistent (durable):
|
||||
|
||||
1. set the `rememberEntities` flag to true in `ClusterShardingSettings` when
|
||||
starting a shard region (or its proxy) for a given `entity` type,
|
||||
or configure `akka.cluster.sharding.remember-entities = on`,
|
||||
1. make sure the it is possible to extract the ID of the sharded entity in the message.
|
||||
|
||||
The performance cost of `rememberEntities` is rather high when starting/stopping entities and when
|
||||
shards are rebalanced. This cost increases with number of entities per shard, thus it is not
|
||||
recommended with more than 10000 active entities per shard.
|
||||
|
||||
### Behavior When Enabled
|
||||
|
||||
When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto another
|
||||
node or recovers after a crash it will recreate all the entities which were previously
|
||||
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
|
||||
sent to the parent of the entity actor, otherwise the entity will be automatically
|
||||
restarted after the entity restart backoff specified in the configuration.
|
||||
|
||||
When [Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are
|
||||
stored in @ref:[Durable Storage](distributed-data.md#durable-storage) of Distributed Data. You may want to change the
|
||||
configuration of the `akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since
|
||||
the default directory contains the remote port of the actor system. If using a dynamically
|
||||
assigned port (0) it will be different each time and the previously stored data will not
|
||||
be loaded.
|
||||
|
||||
The reason for storing the identifiers of the active entities in durable storage, i.e. stored to
|
||||
disk, is that the same entities should be started also after a complete cluster restart. If this is not needed
|
||||
you can disable durable storage and benefit from better performance by using the following configuration:
|
||||
|
||||
```
|
||||
akka.cluster.sharding.distributed-data.durable.keys = []
|
||||
```
|
||||
|
||||
### Behavior When Not Enabled
|
||||
|
||||
When `rememberEntities` is disabled (the default), a `Shard` will not automatically restart any entities
|
||||
after a rebalance or recovering from a crash. Instead, entities are started once the first message
|
||||
for that entity has been received in the `Shard`.
|
||||
|
||||
### Startup after minimum number of members
|
||||
|
||||
It's recommended to use Cluster Sharding with the Cluster setting `akka.cluster.min-nr-of-members` or
|
||||
`akka.cluster.role.<role-name>.min-nr-of-members`. `min-nr-of-members` will defer the allocation of the shards
|
||||
until at least that number of regions have been started and registered to the coordinator. This
|
||||
avoids that many shards are allocated to the first region that registers and only later are
|
||||
rebalanced to other nodes.
|
||||
|
||||
See @ref:[How To Startup when Cluster Size Reached](cluster.md#how-to-startup-when-a-cluster-size-is-reached)
|
||||
for more information about `min-nr-of-members`.
|
||||
|
||||
## Removal of internal Cluster Sharding data
|
||||
|
||||
Removal of internal Cluster Sharding data is only relevant for "Persistent Mode".
|
||||
The Cluster Sharding `ShardCoordinator` stores locations of the shards.
|
||||
This data is safely be removed when restarting the whole Akka Cluster.
|
||||
Note that this does not include application data.
|
||||
|
||||
There is a utility program `akka.cluster.sharding.RemoveInternalClusterShardingData`
|
||||
that removes this data.
|
||||
|
||||
@@@ warning
|
||||
|
||||
Never use this program while there are running Akka Cluster nodes that are
|
||||
using Cluster Sharding. Stop all Cluster nodes before using this program.
|
||||
|
||||
@@@
|
||||
|
||||
It can be needed to remove the data if the Cluster Sharding coordinator
|
||||
cannot startup because of corrupt data, which may happen if accidentally
|
||||
two clusters were running at the same time, e.g. caused by using auto-down
|
||||
and there was a network partition.
|
||||
|
||||
@@@ warning
|
||||
|
||||
**Don't use Cluster Sharding together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple shards and entities* being started, one in each separate cluster!
|
||||
See @ref:[Downing](cluster.md#automatic-vs-manual-downing).
|
||||
|
||||
@@@
|
||||
|
||||
Use this program as a standalone Java main program:
|
||||
|
||||
```
|
||||
java -classpath <jar files, including akka-cluster-sharding>
|
||||
akka.cluster.sharding.RemoveInternalClusterShardingData
|
||||
-2.3 entityType1 entityType2 entityType3
|
||||
```
|
||||
|
||||
The program is included in the `akka-cluster-sharding` jar file. It
|
||||
is easiest to run it with same classpath and configuration as your ordinary
|
||||
application. It can be run from sbt or Maven in similar way.
|
||||
|
||||
Specify the entity type names (same as you use in the `start` method
|
||||
of `ClusterSharding`) as program arguments.
|
||||
|
||||
If you specify `-2.3` as the first program argument it will also try
|
||||
to remove data that was stored by Cluster Sharding in Akka 2.3.x using
|
||||
different persistenceId.
|
||||
|
||||
## Configuration
|
||||
|
||||
The `ClusterSharding` extension can be configured with the following properties. These configuration
|
||||
properties are read by the `ClusterShardingSettings` when created with an ActorSystem parameter.
|
||||
It is also possible to amend the `ClusterShardingSettings` or create it from another config section
|
||||
with the same layout as below.
|
||||
|
||||
@@snip [reference.conf](/akka-cluster-sharding/src/main/resources/reference.conf) { #sharding-ext-config }
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
This document describes how to use Akka Cluster and the Cluster APIs.
|
||||
For specific documentation topics see:
|
||||
|
||||
* @ref:[Cluster Specification](cluster-specification.md)
|
||||
* @ref:[Cluster Specification](cluster-concepts.md)
|
||||
* @ref:[Cluster Membership Service](cluster-membership.md)
|
||||
* @ref:[When and where to use Akka Cluster](choosing-cluster.md)
|
||||
* @ref:[Higher level Cluster tools](#higher-level-cluster-tools)
|
||||
|
|
@ -328,7 +328,7 @@ The roles are part of the membership information in `MemberEvent` that you can s
|
|||
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is
|
||||
unreachable from the rest of the cluster. Please see:
|
||||
|
||||
* @ref:[Failure Detector specification](cluster-specification.md#failure-detector)
|
||||
* @ref:[Failure Detector specification](cluster-concepts.md#failure-detector)
|
||||
* @ref:[Phi Accrual Failure Detector](failure-detector.md) implementation
|
||||
* [Using the Failure Detector](#using-the-failure-detector)
|
||||
|
||||
|
|
@ -361,6 +361,28 @@ There are several configuration properties for the cluster. Refer to the
|
|||
@ref:[reference configuration](../general/configuration.md#config-akka-cluster) for full
|
||||
configuration descriptions, default values and options.
|
||||
|
||||
### How To Startup when a Cluster size is reached
|
||||
|
||||
A common use case is to start actors after the cluster has been initialized,
|
||||
members have joined, and the cluster has reached a certain size.
|
||||
|
||||
With a configuration option you can define required number of members
|
||||
before the leader changes member status of 'Joining' members to 'Up'.:
|
||||
|
||||
```
|
||||
akka.cluster.min-nr-of-members = 3
|
||||
```
|
||||
|
||||
In a similar way you can define required number of members of a certain role
|
||||
before the leader changes member status of 'Joining' members to 'Up'.:
|
||||
|
||||
```
|
||||
akka.cluster.role {
|
||||
frontend.min-nr-of-members = 1
|
||||
backend.min-nr-of-members = 2
|
||||
}
|
||||
```
|
||||
|
||||
### Cluster Info Logging
|
||||
|
||||
You can silence the logging of cluster events at info level with configuration property:
|
||||
|
|
|
|||
|
|
@ -111,3 +111,87 @@ actor ref. All such `Replicator`s must run on the same path in the classic actor
|
|||
A standalone `ReplicatorMessageAdapter` can also be created for a given `Replicator` instead of creating
|
||||
one via the `DistributedData` extension.
|
||||
|
||||
## Durable Storage
|
||||
|
||||
By default the data is only kept in memory. It is redundant since it is replicated to other nodes
|
||||
in the cluster, but if you stop all nodes the data is lost, unless you have saved it
|
||||
elsewhere.
|
||||
|
||||
Entries can be configured to be durable, i.e. stored on local disk on each node. The stored data will be loaded
|
||||
next time the replicator is started, i.e. when actor system is restarted. This means data will survive as
|
||||
long as at least one node from the old cluster takes part in a new cluster. The keys of the durable entries
|
||||
are configured with:
|
||||
|
||||
```
|
||||
akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"]
|
||||
```
|
||||
|
||||
Prefix matching is supported by using `*` at the end of a key.
|
||||
|
||||
All entries can be made durable by specifying:
|
||||
|
||||
```
|
||||
akka.cluster.distributed-data.durable.keys = ["*"]
|
||||
```
|
||||
|
||||
@scala[[LMDB](https://symas.com/products/lightning-memory-mapped-database/)]@java[[LMDB](https://github.com/lmdbjava/lmdbjava/)] is the default storage implementation. It is
|
||||
possible to replace that with another implementation by implementing the actor protocol described in
|
||||
`akka.cluster.ddata.DurableStore` and defining the `akka.cluster.distributed-data.durable.store-actor-class`
|
||||
property for the new implementation.
|
||||
|
||||
The location of the files for the data is configured with:
|
||||
|
||||
Scala
|
||||
: ```
|
||||
# Directory of LMDB file. There are two options:
|
||||
# 1. A relative or absolute path to a directory that ends with 'ddata'
|
||||
# the full name of the directory will contain name of the ActorSystem
|
||||
# and its remote port.
|
||||
# 2. Otherwise the path is used as is, as a relative or absolute path to
|
||||
# a directory.
|
||||
akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
|
||||
```
|
||||
|
||||
Java
|
||||
: ```
|
||||
# Directory of LMDB file. There are two options:
|
||||
# 1. A relative or absolute path to a directory that ends with 'ddata'
|
||||
# the full name of the directory will contain name of the ActorSystem
|
||||
# and its remote port.
|
||||
# 2. Otherwise the path is used as is, as a relative or absolute path to
|
||||
# a directory.
|
||||
akka.cluster.distributed-data.durable.lmdb.dir = "ddata"
|
||||
```
|
||||
|
||||
|
||||
When running in production you may want to configure the directory to a specific
|
||||
path (alt 2), since the default directory contains the remote port of the
|
||||
actor system to make the name unique. If using a dynamically assigned
|
||||
port (0) it will be different each time and the previously stored data
|
||||
will not be loaded.
|
||||
|
||||
Making the data durable has a performance cost. By default, each update is flushed
|
||||
to disk before the `UpdateSuccess` reply is sent. For better performance, but with the risk of losing
|
||||
the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during
|
||||
a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially
|
||||
efficient when performing many writes to the same key, because it is only the last value for each key
|
||||
that will be serialized and stored. The risk of losing writes if the JVM crashes is small since the
|
||||
data is typically replicated to other nodes immediately according to the given `WriteConsistency`.
|
||||
|
||||
```
|
||||
akka.cluster.distributed-data.durable.lmdb.write-behind-interval = 200 ms
|
||||
```
|
||||
|
||||
Note that you should be prepared to receive `WriteFailure` as reply to an `Update` of a
|
||||
durable entry if the data could not be stored for some reason. When enabling `write-behind-interval`
|
||||
such errors will only be logged and `UpdateSuccess` will still be the reply to the `Update`.
|
||||
|
||||
There is one important caveat when it comes pruning of [CRDT Garbage](#crdt-garbage) for durable data.
|
||||
If an old data entry that was never pruned is injected and merged with existing data after
|
||||
that the pruning markers have been removed the value will not be correct. The time-to-live
|
||||
of the markers is defined by configuration
|
||||
`akka.cluster.distributed-data.durable.remove-pruning-marker-after` and is in the magnitude of days.
|
||||
This would be possible if a node with durable data didn't participate in the pruning
|
||||
(e.g. it was shutdown) and later started after this time. A node with durable data should not
|
||||
be stopped for longer time than this duration and if it is joining again after this
|
||||
duration its data should first be manually removed (from the lmdb directory).
|
||||
|
|
@ -5,11 +5,12 @@
|
|||
@@@ index
|
||||
|
||||
* [cluster](cluster.md)
|
||||
* [cluster-specification](cluster-specification.md)
|
||||
* [cluster-specification](cluster-concepts.md)
|
||||
* [cluster-membership](cluster-membership.md)
|
||||
* [distributed-data](distributed-data.md)
|
||||
* [cluster-singleton](cluster-singleton.md)
|
||||
* [cluster-sharding](cluster-sharding.md)
|
||||
* [cluster-sharding-specification](cluster-sharding-concepts.md)
|
||||
* [serialization](../serialization.md)
|
||||
* [serialization-jackson](../serialization-jackson.md)
|
||||
* [multi-jvm-testing](../multi-jvm-testing.md)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue