diff --git a/akka-docs/src/main/paradox/additional/osgi.md b/akka-docs/src/main/paradox/additional/osgi.md index 9e4d197414..1426c5892c 100644 --- a/akka-docs/src/main/paradox/additional/osgi.md +++ b/akka-docs/src/main/paradox/additional/osgi.md @@ -86,7 +86,7 @@ in an application composed of multiple JARs to reside under a single package nam might scan all classes from `com.example.plugins` for specific service implementations with that package existing in several contributed JARs. While it is possible to support overlapping packages with complex manifest headers, it's much better to use non-overlapping -package spaces and facilities such as @ref:[Akka Cluster](../typed/cluster-specification.md) +package spaces and facilities such as @ref:[Akka Cluster](../typed/cluster-concepts.md) for service discovery. Stylistically, many organizations opt to use the root package path as the name of the bundle distribution file. diff --git a/akka-docs/src/main/paradox/additional/rolling-updates.md b/akka-docs/src/main/paradox/additional/rolling-updates.md index a452e56f41..b5c7a6fd8f 100644 --- a/akka-docs/src/main/paradox/additional/rolling-updates.md +++ b/akka-docs/src/main/paradox/additional/rolling-updates.md @@ -36,7 +36,7 @@ Additionally you can find advice on @ref:[Persistence - Schema Evolution](../per ## Cluster Sharding -During a rolling upgrade, sharded entities receiving traffic may be moved during @ref:[shard rebalancing](../cluster-sharding.md#shard-rebalancing), +During a rolling upgrade, sharded entities receiving traffic may be moved during @ref:[shard rebalancing](../typed/cluster-sharding-concepts.md#shard-rebalancing), to an old or new node in the cluster, based on the pluggable allocation strategy and settings. When an old node is stopped the shards that were running on it are moved to one of the other old nodes remaining in the cluster. The `ShardCoordinator` is itself a cluster singleton. diff --git a/akka-docs/src/main/paradox/cluster-dc.md b/akka-docs/src/main/paradox/cluster-dc.md index a25d622224..8d464a3b19 100644 --- a/akka-docs/src/main/paradox/cluster-dc.md +++ b/akka-docs/src/main/paradox/cluster-dc.md @@ -77,7 +77,7 @@ up a large cluster into smaller groups of nodes for better scalability. ## Membership Some @ref[membership transitions](typed/cluster-membership.md#membership-lifecycle) are managed by -one node called the @ref[leader](typed/cluster-specification.md#leader). There is one leader per data center +one node called the @ref[leader](typed/cluster-concepts.md#leader). There is one leader per data center and it is responsible for these transitions for the members within the same data center. Members of other data centers are managed independently by the leader of the respective data center. These actions cannot be performed while there are any unreachability observations among the nodes in the data center, @@ -105,7 +105,7 @@ Java ## Failure Detection -@ref[Failure detection](typed/cluster-specification.md#failure-detector) is performed by sending heartbeat messages +@ref[Failure detection](typed/cluster-concepts.md#failure-detector) is performed by sending heartbeat messages to detect if a node is unreachable. This is done more frequently and with more certainty among the nodes in the same data center than across data centers. The failure detection across different data centers should be interpreted as an indication of problem with the network link between the data centers. diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index f893ccd109..60ad4781b7 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -1,7 +1,7 @@ # Classic Cluster Sharding @@include[includes.md](includes.md) { #actor-api } -For the new API see @ref:[cluster-sharding](typed/cluster-sharding.md). +For the new API see @ref:[Cluster Sharding](typed/cluster-sharding.md). ## Dependency @@ -22,38 +22,9 @@ to see what this looks like in practice. ## Introduction -Cluster sharding is useful when you need to distribute actors across several nodes in the cluster and want to -be able to interact with them using their logical identifier, but without having to care about -their physical location in the cluster, which might also change over time. +For an introduction to Sharding concepts see @ref:[Cluster Sharding](typed/cluster-sharding.md). -It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. -Here we call these actors "entities". These actors typically have persistent (durable) state, -but this feature is not limited to actors with persistent state. - -Cluster sharding is typically used when you have many stateful actors that together consume -more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors -it might be easier to run them on a @ref:[Cluster Singleton](cluster-singleton.md) node. - -In this context sharding means that actors with an identifier, so called entities, -can be automatically distributed across multiple nodes in the cluster. Each entity -actor runs only at one place, and messages can be sent to the entity without requiring -the sender to know the location of the destination actor. This is achieved by sending -the messages via a `ShardRegion` actor provided by this extension, which knows how -to route the message with the entity id to the final destination. - -Cluster sharding will not be active on members with status @ref:[WeaklyUp](typed/cluster-membership.md#weaklyup-members) -if that feature is enabled. - -@@@ warning - -**Don't use Cluster Sharding together with Automatic Downing**, -since it allows the cluster to split up into two separate clusters, which in turn will result -in *multiple shards and entities* being started, one in each separate cluster! -See @ref:[Downing](typed/cluster.md#automatic-vs-manual-downing). - -@@@ - -## An Example +## Basic example This is how an entity actor may look like: @@ -140,205 +111,36 @@ A more comprehensive sample is available in the ## How it works -The `ShardRegion` actor is started on each node in the cluster, or group of nodes -tagged with a specific role. The `ShardRegion` is created with two application specific -functions to extract the entity identifier and the shard identifier from incoming messages. -A `Shard` is a group of entities that will be managed together. For the first message in a -specific shard the `ShardRegion` requests the location of the shard from a central coordinator, -the `ShardCoordinator`. - -The `ShardCoordinator` decides which `ShardRegion` shall own the `Shard` and informs -that `ShardRegion`. The region will confirm this request and create the `Shard` supervisor -as a child actor. The individual `Entities` will then be created when needed by the `Shard` -actor. Incoming messages thus travel via the `ShardRegion` and the `Shard` to the target -`Entity`. - -If the shard home is another `ShardRegion` instance messages will be forwarded -to that `ShardRegion` instance instead. While resolving the location of a -shard incoming messages for that shard are buffered and later delivered when the -shard home is known. Subsequent messages to the resolved shard can be delivered -to the target destination immediately without involving the `ShardCoordinator`. - -### Scenarios - -Once a `Shard` location is known `ShardRegion`s send messages directly. Here are the -scenarios for getting to this state. In the scenarios the following notation is used: - -* `SC` - ShardCoordinator -* `M#` - Message 1, 2, 3, etc -* `SR#` - ShardRegion 1, 2 3, etc -* `S#` - Shard 1 2 3, etc -* `E#` - Entity 1 2 3, etc. An entity refers to an Actor managed by Cluster Sharding. - -Where `#` is a number to distinguish between instances as there are multiple in the Cluster. - -#### Scenario 1: Message to an unknown shard that belongs to the local ShardRegion - - 1. Incoming message `M1` to `ShardRegion` instance `SR1`. - 2. `M1` is mapped to shard `S1`. `SR1` doesn't know about `S1`, so it asks the `SC` for the location of `S1`. - 3. `SC` answers that the home of `S1` is `SR1`. - 4. `SR1` creates child actor shard `S1` and forwards the message to it. - 5. `S1` creates child actor for `E1` and forwards the message to it. - 6. All incoming messages for `S1` which arrive at `SR1` can be handled by `SR1` without `SC`. - -#### Scenario 2: Message to an unknown shard that belongs to a remote ShardRegion - - 1. Incoming message `M2` to `ShardRegion` instance `SR1`. - 2. `M2` is mapped to `S2`. `SR1` doesn't know about `S2`, so it asks `SC` for the location of `S2`. - 3. `SC` answers that the home of `S2` is `SR2`. - 4. `SR1` sends buffered messages for `S2` to `SR2`. - 5. All incoming messages for `S2` which arrive at `SR1` can be handled by `SR1` without `SC`. It forwards messages to `SR2`. - 6. `SR2` receives message for `S2`, ask `SC`, which answers that the home of `S2` is `SR2`, and we are in Scenario 1 (but for `SR2`). - - -### Shard location - -To make sure that at most one instance of a specific entity actor is running somewhere -in the cluster it is important that all nodes have the same view of where the shards -are located. Therefore the shard allocation decisions are taken by the central -`ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on -the oldest member among all cluster nodes or a group of nodes tagged with a specific -role. - -The logic that decides where a shard is to be located is defined in a pluggable shard -allocation strategy. The default implementation `ShardCoordinator.LeastShardAllocationStrategy` -allocates new shards to the `ShardRegion` with least number of previously allocated shards. -This strategy can be replaced by an application specific implementation. - -### Shard Rebalancing - -To be able to use newly added members in the cluster the coordinator facilitates rebalancing -of shards, i.e. migrate entities from one node to another. In the rebalance process the -coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. -That means they will start buffering incoming messages for that shard, in the same way as if the -shard location is unknown. During the rebalance process the coordinator will not answer any -requests for the location of shards that are being rebalanced, i.e. local buffering will -continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard -will stop all entities in that shard by sending the specified `stopMessage` -(default `PoisonPill`) to them. When all entities have been terminated the `ShardRegion` -owning the entities will acknowledge the handoff as completed to the coordinator. -Thereafter the coordinator will reply to requests for the location of -the shard, thereby allocating a new home for the shard, and then buffered messages in the -`ShardRegion` actors are delivered to the new location. This means that the state of the entities -are not transferred or migrated. If the state of the entities are of importance it should be -persistent (durable), e.g. with @ref:[Persistence](persistence.md), so that it can be recovered at the new -location. - -The logic that decides which shards to rebalance is defined in a pluggable shard -allocation strategy. The default implementation `ShardCoordinator.LeastShardAllocationStrategy` -picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. -They will then be allocated to the `ShardRegion` with least number of previously allocated shards, -i.e. new members in the cluster. - -For the `LeastShardAllocationStrategy` there is a configurable threshold (`rebalance-threshold`) of -how large the difference must be to begin the rebalancing. The difference between number of shards in -the region with most shards and the region with least shards must be greater than the `rebalance-threshold` -for the rebalance to occur. - -A `rebalance-threshold` of 1 gives the best distribution and therefore typically the best choice. -A higher threshold means that more shards can be rebalanced at the same time instead of one-by-one. -That has the advantage that the rebalance process can be quicker but has the drawback that the -the number of shards (and therefore load) between different nodes may be significantly different. - -### ShardCoordinator State - -The state of shard locations in the `ShardCoordinator` is persistent (durable) with -@ref:[Distributed Data](distributed-data.md) to survive failures. - -When a crashed or -unreachable coordinator node has been removed (via down) from the cluster a new `ShardCoordinator` singleton -actor will take over and the state is recovered. During such a failure period shards -with a known location are still available, while messages for new (unknown) shards -are buffered until the new `ShardCoordinator` becomes available. - -### Message ordering - -As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity -actor the order of the messages is preserved. As long as the buffer limit is not reached -messages are delivered on a best effort basis, with at-most once delivery semantics, -in the same way as ordinary message sending. Reliable end-to-end messaging, with -at-least-once semantics can be added by using `AtLeastOnceDelivery` in @ref:[Persistence](persistence.md). - -### Overhead - -Some additional latency is introduced for messages targeted to new or previously -unused shards due to the round-trip to the coordinator. Rebalancing of shards may -also add latency. This should be considered when designing the application specific -shard resolution, e.g. to avoid too fine grained shards. Once a shard's location is known -the only overhead is sending a message via the `ShardRegion` rather than directly. +See @ref:[Cluster Sharding concepts](typed/cluster-sharding-concepts.md) in the documentation of the new APIs. ## Sharding State Store Mode There are two cluster sharding states managed: -1. @ref:[ShardCoordinator State](#shardcoordinator-state) - the `Shard` locations + +1. @ref:[ShardCoordinator State](typed/cluster-sharding-concepts.md#shardcoordinator-state) - the `Shard` locations 1. @ref:[Remembering Entities](#remembering-entities) - the entities in each `Shard`, which is optional, and disabled by default For these, there are currently two modes which define how these states are stored: + * @ref:[Distributed Data Mode](#distributed-data-mode) - uses Akka @ref:[Distributed Data](distributed-data.md) (CRDTs) (the default) * @ref:[Persistence Mode](#persistence-mode) - (deprecated) uses Akka @ref:[Persistence](persistence.md) (Event Sourcing) -@@@ warning - -Persistence for state store mode is deprecated. - -@@@ +@@include[cluster.md](includes/cluster.md) { #sharding-persistence-mode-deprecated } Changing the mode requires @ref:[a full cluster restart](additional/rolling-updates.md#cluster-sharding-configuration-change). ### Distributed Data Mode -This mode is enabled with configuration (enabled by default): - -``` -akka.cluster.sharding.state-store-mode = ddata -``` - The state of the `ShardCoordinator` is replicated across the cluster but is not durable, not stored to disk. The `ShardCoordinator` state replication is handled by @ref:[Distributed Data](distributed-data.md) with `WriteMajority`/`ReadMajority` consistency. When all nodes in the cluster have been stopped, the state is no longer needed and dropped. -The state of @ref:[Remembering Entities](#remembering-entities) is durable and stored to -disk. This means remembered entities are restarted even after a complete (non-rolling) cluster restart when the disk is still available. - -Cluster Sharding uses its own Distributed Data `Replicator` per node. -If using roles with sharding there is one `Replicator` per role, which enables a subset of -all nodes for some entity types and another subset for other entity types. Each such replicator has a name -that contains the node role and therefore the role configuration must be the same on all nodes in the -cluster, for example you can't change the roles when performing a rolling upgrade. -Changing roles requires @ref:[a full cluster restart](additional/rolling-updates.md#cluster-sharding-configuration-change). - -The settings for Distributed Data are configured in the section -`akka.cluster.sharding.distributed-data`. It's not possible to have different -`distributed-data` settings for different sharding entity types. +See @ref:[Distributed Data mode](typed/cluster-sharding.md#distributed-data-mode) in the documentation of the new APIs. ### Persistence Mode -This mode is enabled with configuration: - -``` -akka.cluster.sharding.state-store-mode = persistence -``` - -Since it is running in a cluster @ref:[Persistence](persistence.md) must be configured with a distributed journal. - -@@@ note - -Persistence mode for @ref:[Remembering Entities](#remembering-entities) will be replaced by a pluggable data access API with storage implementations. -New sharding applications should no longer choose persistence mode. Existing users of persistence mode -[can eventually migrate to the replacement options](https://github.com/akka/akka/issues/26177). - -@@@ - -## Startup after minimum number of members - -It's good to use Cluster Sharding with the Cluster setting `akka.cluster.min-nr-of-members` or -`akka.cluster.role..min-nr-of-members`. That will defer the allocation of the shards -until at least that number of regions have been started and registered to the coordinator. This -avoids that many shards are allocated to the first region that registers and only later are -rebalanced to other nodes. - -See @ref:[How To Startup when Cluster Size Reached](cluster-usage.md#min-members) for more information about `min-nr-of-members`. +See @ref:[Persistence Mode](typed/cluster-sharding.md#persistence-mode) in the documentation of the new APIs. ## Proxy Only Mode @@ -362,30 +164,20 @@ then supposed to stop itself. Incoming messages will be buffered by the `Shard` between reception of `Passivate` and termination of the entity. Such buffered messages are thereafter delivered to a new incarnation of the entity. -### Automatic Passivation - -The entities are automatically passivated if they haven't received a message within the duration configured in -`akka.cluster.sharding.passivate-idle-entity-after` -or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable -time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages -to the `ActorRef` or messages that the actor sends to itself are not counted in this activity. -Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. -It is always disabled if @ref:[Remembering Entities](#remembering-entities) is enabled. - +See @ref:[Automatic Passivation](typed/cluster-sharding.md#automatic-passivation) in the documentation of the new APIs. + ## Remembering Entities -Remembering entities pertains to restarting entities after a rebalance or recovering from a crash. -Enabling or disabling (the default) this feature drives the behavior of the restarts: -* enabled: entities are restarted, even though no new messages are sent to them -* disabled: entities are restarted, on demand when a new message arrives - +See @ref:[Remembering Entities](typed/cluster-sharding.md#remembering-entities) in the documentation of the new APIs, +including behavior when enabled and disabled. + Note that the state of the entities themselves will not be restored unless they have been made persistent, for example with @ref:[Event Sourcing](persistence.md). To make the list of entities in each `Shard` persistent (durable), set the `rememberEntities` flag to true in `ClusterShardingSettings` when calling -`ClusterSharding.start` and making sure the `shardIdExtractor` handles +`ClusterSharding.start` and make sure the `shardIdExtractor` handles `Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to extract from the `EntityId`. @@ -394,42 +186,7 @@ Scala Java : @@snip [ClusterShardingTest.java](/akka-docs/src/test/java/jdocs/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity } - -This can also be configured by setting `akka.cluster.sharding.remember-entities = on`. - -The performance cost of `rememberEntities` is rather high when starting/stopping entities and when -shards are rebalanced. This cost increases with number of entities per shard, thus it is not -recommended with more than 10000 active (non passivated) entities per shard. - -### Behavior When Enabled - -When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto another -node or recovers after a crash it will recreate all the entities which were previously -running in that `Shard`. To permanently stop entities, a `Passivate` message must be -sent to the parent of the entity actor, otherwise the entity will be automatically -restarted after the entity restart backoff specified in the configuration. - -When [Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are -stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the -configuration of the `akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since -the default directory contains the remote port of the actor system. If using a dynamically -assigned port (0) it will be different each time and the previously stored data will not -be loaded. - -The reason for storing the identifiers of the active entities in durable storage, i.e. stored to -disk, is that the same entities should be started also after a complete cluster restart. If this is not needed -you can disable durable storage and benefit from better performance by using the following configuration: - -``` -akka.cluster.sharding.distributed-data.durable.keys = [] -``` - -### Behavior When Not Enabled - -When `rememberEntities` is disabled (the default), a `Shard` will not automatically restart any entities -after a rebalance or recovering from a crash. Instead, entities are started once the first message -for that entity has been received in the `Shard`. - + ## Supervision If you need to use another `supervisorStrategy` for the entity actors than the default (restarting) strategy @@ -468,67 +225,15 @@ graceful leaving process of a cluster member. ## Removal of Internal Cluster Sharding Data -The Cluster Sharding `ShardCoordinator` stores locations of the shards. -This data is safely be removed when restarting the whole Akka Cluster. -Note that this does not include application data. - -There is a utility program `akka.cluster.sharding.RemoveInternalClusterShardingData` -that removes this data. - -@@@ warning - -Never use this program while there are running Akka Cluster nodes that are -using Cluster Sharding. Stop all Cluster nodes before using this program. - -@@@ - -It can be needed to remove the data if the Cluster Sharding coordinator -cannot startup because of corrupt data, which may happen if accidentally -two clusters were running at the same time, e.g. caused by using auto-down -and there was a network partition. - -@@@ warning - -**Don't use Cluster Sharding together with Automatic Downing**, -since it allows the cluster to split up into two separate clusters, which in turn will result -in *multiple shards and entities* being started, one in each separate cluster! -See @ref:[Downing](typed/cluster.md#automatic-vs-manual-downing). - -@@@ - -Use this program as a standalone Java main program: - -``` -java -classpath - akka.cluster.sharding.RemoveInternalClusterShardingData - -2.3 entityType1 entityType2 entityType3 -``` - -The program is included in the `akka-cluster-sharding` jar file. It -is easiest to run it with same classpath and configuration as your ordinary -application. It can be run from sbt or Maven in similar way. - -Specify the entity type names (same as you use in the `start` method -of `ClusterSharding`) as program arguments. - -If you specify `-2.3` as the first program argument it will also try -to remove data that was stored by Cluster Sharding in Akka 2.3.x using -different persistenceId. +See @ref:[removal of Internal Cluster Sharding Data](typed/cluster-sharding.md#removal-of-internal-cluster-sharding-data) in the documentation of the new APIs. ## Configuration -The `ClusterSharding` extension can be configured with the following properties. These configuration -properties are read by the `ClusterShardingSettings` when created with a `ActorSystem` parameter. -It is also possible to amend the `ClusterShardingSettings` or create it from another config section -with the same layout as below. `ClusterShardingSettings` is a parameter to the `start` method of +`ClusterShardingSettings` is a parameter to the `start` method of the `ClusterSharding` extension, i.e. each each entity type can be configured with different settings if needed. -@@snip [reference.conf](/akka-cluster-sharding/src/main/resources/reference.conf) { #sharding-ext-config } - -Custom shard allocation strategy can be defined in an optional parameter to -`ClusterSharding.start`. See the API documentation of @scala[`ShardAllocationStrategy`] @java[`AbstractShardAllocationStrategy`] for details -of how to implement a custom shard allocation strategy. +See @ref:[configuration](typed/cluster-sharding.md#configuration) for more information. ## Inspecting cluster sharding state diff --git a/akka-docs/src/main/paradox/cluster-usage.md b/akka-docs/src/main/paradox/cluster-usage.md index bc5a5dfb4b..740dfdc1ea 100644 --- a/akka-docs/src/main/paradox/cluster-usage.md +++ b/akka-docs/src/main/paradox/cluster-usage.md @@ -3,7 +3,7 @@ This document describes how to use Akka Cluster and the Cluster APIs using code samples. For specific documentation topics see: -* @ref:[Cluster Specification](typed/cluster-specification.md) +* @ref:[Cluster Specification](typed/cluster-concepts.md) * @ref:[Cluster Membership Service](typed/cluster-membership.md) * @ref:[When and where to use Akka Cluster](typed/choosing-cluster.md) * @ref:[Higher level Cluster tools](#higher-level-cluster-tools) @@ -39,7 +39,7 @@ It contains instructions on how to run the `SimpleClusterApp`. ## When and where to use Akka Cluster -See [Choosing Akka Cluster](typed/choosing-cluster.md#when-and-where-to-use-akka-cluster). +See [Choosing Akka Cluster](typed/choosing-cluster.md#when-and-where-to-use-akka-cluster) in the documentation of the new APIs. ## Cluster API Extension @@ -204,11 +204,15 @@ It contains instructions on how to run the **Worker Dial-in Example** sample. ## Node Roles -See @ref:[Cluster Node Roles](typed/cluster.md#node-roles) +See @ref:[Cluster Node Roles](typed/cluster.md#node-roles) in the documentation of the new APIs. ## How To Startup when Cluster Size Reached +See @ref:[How to startup when a minimum number of members in the cluster is reached](typed/cluster.md#how-to-startup-when-a-cluster-size-is-reached) in the documentation of the new APIs. + +## How To Startup when Member is Up + A common use case is to start actors after the cluster has been initialized, members have joined, and the cluster has reached a certain size. @@ -229,8 +233,6 @@ akka.cluster.role { } ``` -## How To Startup when Member is Up - You can start actors or trigger any functions using the `registerOnMemberUp` callback, which will be invoked when the current member status is changed to 'Up'. This can additionally be used with `akka.cluster.min-nr-of-members` optional configuration to defer an action until the cluster has reached a certain size. @@ -304,7 +306,7 @@ See @ref:[Cluster Metrics](cluster-metrics.md). The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. Please see: -* @ref:[Failure Detector specification](typed/cluster-specification.md#failure-detector) +* @ref:[Failure Detector specification](typed/cluster-concepts.md#failure-detector) * @ref:[Phi Accrual Failure Detector](typed/failure-detector.md) implementation * [Using the Failure Detector](typed/cluster.md#using-the-failure-detector) diff --git a/akka-docs/src/main/paradox/distributed-data.md b/akka-docs/src/main/paradox/distributed-data.md index e460b2a046..1accdb4547 100644 --- a/akka-docs/src/main/paradox/distributed-data.md +++ b/akka-docs/src/main/paradox/distributed-data.md @@ -658,88 +658,7 @@ Java ### Durable Storage -By default the data is only kept in memory. It is redundant since it is replicated to other nodes -in the cluster, but if you stop all nodes the data is lost, unless you have saved it -elsewhere. - -Entries can be configured to be durable, i.e. stored on local disk on each node. The stored data will be loaded -next time the replicator is started, i.e. when actor system is restarted. This means data will survive as -long as at least one node from the old cluster takes part in a new cluster. The keys of the durable entries -are configured with: - -``` -akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"] -``` - -Prefix matching is supported by using `*` at the end of a key. - -All entries can be made durable by specifying: - -``` -akka.cluster.distributed-data.durable.keys = ["*"] -``` - -@scala[[LMDB](https://symas.com/products/lightning-memory-mapped-database/)]@java[[LMDB](https://github.com/lmdbjava/lmdbjava/)] is the default storage implementation. It is -possible to replace that with another implementation by implementing the actor protocol described in -`akka.cluster.ddata.DurableStore` and defining the `akka.cluster.distributed-data.durable.store-actor-class` -property for the new implementation. - -The location of the files for the data is configured with: - -Scala -: ``` -# Directory of LMDB file. There are two options: -# 1. A relative or absolute path to a directory that ends with 'ddata' -# the full name of the directory will contain name of the ActorSystem -# and its remote port. -# 2. Otherwise the path is used as is, as a relative or absolute path to -# a directory. -akka.cluster.distributed-data.durable.lmdb.dir = "ddata" -``` - -Java -: ``` -# Directory of LMDB file. There are two options: -# 1. A relative or absolute path to a directory that ends with 'ddata' -# the full name of the directory will contain name of the ActorSystem -# and its remote port. -# 2. Otherwise the path is used as is, as a relative or absolute path to -# a directory. -akka.cluster.distributed-data.durable.lmdb.dir = "ddata" -``` - - -When running in production you may want to configure the directory to a specific -path (alt 2), since the default directory contains the remote port of the -actor system to make the name unique. If using a dynamically assigned -port (0) it will be different each time and the previously stored data -will not be loaded. - -Making the data durable has a performance cost. By default, each update is flushed -to disk before the `UpdateSuccess` reply is sent. For better performance, but with the risk of losing -the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during -a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially -efficient when performing many writes to the same key, because it is only the last value for each key -that will be serialized and stored. The risk of losing writes if the JVM crashes is small since the -data is typically replicated to other nodes immediately according to the given `WriteConsistency`. - -``` -akka.cluster.distributed-data.durable.lmdb.write-behind-interval = 200 ms -``` - -Note that you should be prepared to receive `WriteFailure` as reply to an `Update` of a -durable entry if the data could not be stored for some reason. When enabling `write-behind-interval` -such errors will only be logged and `UpdateSuccess` will still be the reply to the `Update`. - -There is one important caveat when it comes pruning of [CRDT Garbage](#crdt-garbage) for durable data. -If an old data entry that was never pruned is injected and merged with existing data after -that the pruning markers have been removed the value will not be correct. The time-to-live -of the markers is defined by configuration -`akka.cluster.distributed-data.durable.remove-pruning-marker-after` and is in the magnitude of days. -This would be possible if a node with durable data didn't participate in the pruning -(e.g. it was shutdown) and later started after this time. A node with durable data should not -be stopped for longer time than this duration and if it is joining again after this -duration its data should first be manually removed (from the lmdb directory). +See @ref:[Durable Storage](typed/distributed-data.md#durable-storage) in the documentation of the new APIs. ### CRDT Garbage diff --git a/akka-docs/src/main/paradox/includes/cluster.md b/akka-docs/src/main/paradox/includes/cluster.md index 2b16f57470..de602bf33b 100644 --- a/akka-docs/src/main/paradox/includes/cluster.md +++ b/akka-docs/src/main/paradox/includes/cluster.md @@ -48,3 +48,11 @@ at startup by using some external tool or API. When joining to seed nodes you sh the node itself except for the node that is supposed to be the first seed node, which should be placed first in the parameter to the programmatic join. + + +@@@ warning + +Persistence for state store mode is deprecated. + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/stream-refs.md b/akka-docs/src/main/paradox/stream/stream-refs.md index 371d5a988d..b084599533 100644 --- a/akka-docs/src/main/paradox/stream/stream-refs.md +++ b/akka-docs/src/main/paradox/stream/stream-refs.md @@ -41,7 +41,7 @@ implement manually. @@@ @@@ warning { title=IMPORTANT } - Use stream refs with Akka Cluster. The [failure detector can cause quarantining](../typed/cluster-specification.md#quarantined) if plain Akka remoting is used. + Use stream refs with Akka Cluster. The [failure detector can cause quarantining](../typed/cluster-concepts.md#quarantined) if plain Akka remoting is used. @@@ ## Stream References diff --git a/akka-docs/src/main/paradox/typed/cluster-specification.md b/akka-docs/src/main/paradox/typed/cluster-concepts.md similarity index 100% rename from akka-docs/src/main/paradox/typed/cluster-specification.md rename to akka-docs/src/main/paradox/typed/cluster-concepts.md diff --git a/akka-docs/src/main/paradox/typed/cluster-membership.md b/akka-docs/src/main/paradox/typed/cluster-membership.md index a989d75df9..1e5aebfe86 100644 --- a/akka-docs/src/main/paradox/typed/cluster-membership.md +++ b/akka-docs/src/main/paradox/typed/cluster-membership.md @@ -1,8 +1,8 @@ # Cluster Membership Service The core of Akka Cluster is the cluster membership, to keep track of what nodes are part of the cluster and -their health. Cluster membership is communicated using @ref:[gossip](cluster-specification.md#gossip) and -@ref:[failure detection](cluster-specification.md#failure-detector). +their health. Cluster membership is communicated using @ref:[gossip](cluster-concepts.md#gossip) and +@ref:[failure detection](cluster-concepts.md#failure-detector). There are several @ref:[Higher level Cluster tools](../typed/cluster.md#higher-level-cluster-tools) that are built on top of the cluster membership service. diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding-concepts.md b/akka-docs/src/main/paradox/typed/cluster-sharding-concepts.md new file mode 100644 index 0000000000..e9b40a5bcf --- /dev/null +++ b/akka-docs/src/main/paradox/typed/cluster-sharding-concepts.md @@ -0,0 +1,130 @@ +# Cluster Sharding concepts + +The `ShardRegion` actor is started on each node in the cluster, or group of nodes +tagged with a specific role. The `ShardRegion` is created with two application specific +functions to extract the entity identifier and the shard identifier from incoming messages. +A `Shard` is a group of entities that will be managed together. For the first message in a +specific shard the `ShardRegion` requests the location of the shard from a central coordinator, +the `ShardCoordinator`. + +The `ShardCoordinator` decides which `ShardRegion` shall own the `Shard` and informs +that `ShardRegion`. The region will confirm this request and create the `Shard` supervisor +as a child actor. The individual `Entities` will then be created when needed by the `Shard` +actor. Incoming messages thus travel via the `ShardRegion` and the `Shard` to the target +`Entity`. + +If the shard home is another `ShardRegion` instance messages will be forwarded +to that `ShardRegion` instance instead. While resolving the location of a +shard incoming messages for that shard are buffered and later delivered when the +shard home is known. Subsequent messages to the resolved shard can be delivered +to the target destination immediately without involving the `ShardCoordinator`. + +### Scenarios + +Once a `Shard` location is known `ShardRegion`s send messages directly. Here are the +scenarios for getting to this state. In the scenarios the following notation is used: + +* `SC` - ShardCoordinator +* `M#` - Message 1, 2, 3, etc +* `SR#` - ShardRegion 1, 2 3, etc +* `S#` - Shard 1 2 3, etc +* `E#` - Entity 1 2 3, etc. An entity refers to an Actor managed by Cluster Sharding. + +Where `#` is a number to distinguish between instances as there are multiple in the Cluster. + +#### Scenario 1: Message to an unknown shard that belongs to the local ShardRegion + + 1. Incoming message `M1` to `ShardRegion` instance `SR1`. + 2. `M1` is mapped to shard `S1`. `SR1` doesn't know about `S1`, so it asks the `SC` for the location of `S1`. + 3. `SC` answers that the home of `S1` is `SR1`. + 4. `SR1` creates child actor shard `S1` and forwards the message to it. + 5. `S1` creates child actor for `E1` and forwards the message to it. + 6. All incoming messages for `S1` which arrive at `SR1` can be handled by `SR1` without `SC`. + +#### Scenario 2: Message to an unknown shard that belongs to a remote ShardRegion + + 1. Incoming message `M2` to `ShardRegion` instance `SR1`. + 2. `M2` is mapped to `S2`. `SR1` doesn't know about `S2`, so it asks `SC` for the location of `S2`. + 3. `SC` answers that the home of `S2` is `SR2`. + 4. `SR1` sends buffered messages for `S2` to `SR2`. + 5. All incoming messages for `S2` which arrive at `SR1` can be handled by `SR1` without `SC`. It forwards messages to `SR2`. + 6. `SR2` receives message for `S2`, ask `SC`, which answers that the home of `S2` is `SR2`, and we are in Scenario 1 (but for `SR2`). + +### Shard location + +To make sure that at most one instance of a specific entity actor is running somewhere +in the cluster it is important that all nodes have the same view of where the shards +are located. Therefore the shard allocation decisions are taken by the central +`ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on +the oldest member among all cluster nodes or a group of nodes tagged with a specific +role. + +The logic that decides where a shard is to be located is defined in a +pluggable @ref:[shard allocation strategy](cluster-sharding.md#shard-allocation-strategy). + +### Shard rebalancing + +To be able to use newly added members in the cluster the coordinator facilitates rebalancing +of shards, i.e. migrate entities from one node to another. In the rebalance process the +coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. +That means they will start buffering incoming messages for that shard, in the same way as if the +shard location is unknown. During the rebalance process the coordinator will not answer any +requests for the location of shards that are being rebalanced, i.e. local buffering will +continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard +will stop all entities in that shard by sending the specified `stopMessage` +(default `PoisonPill`) to them. When all entities have been terminated the `ShardRegion` +owning the entities will acknowledge the handoff as completed to the coordinator. +Thereafter the coordinator will reply to requests for the location of +the shard, thereby allocating a new home for the shard, and then buffered messages in the +`ShardRegion` actors are delivered to the new location. This means that the state of the entities +are not transferred or migrated. If the state of the entities are of importance it should be +persistent (durable), e.g. with @ref:[Persistence](persistence.md) (or see @ref:[Classic Persistence](../persistence.md)), so that it can be recovered at the new +location. + +The logic that decides which shards to rebalance is defined in a pluggable shard +allocation strategy. The default implementation `ShardCoordinator.LeastShardAllocationStrategy` +picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. +They will then be allocated to the `ShardRegion` with least number of previously allocated shards, +i.e. new members in the cluster. + +For the `LeastShardAllocationStrategy` there is a configurable threshold (`rebalance-threshold`) of +how large the difference must be to begin the rebalancing. The difference between number of shards in +the region with most shards and the region with least shards must be greater than the `rebalance-threshold` +for the rebalance to occur. + +A `rebalance-threshold` of 1 gives the best distribution and therefore typically the best choice. +A higher threshold means that more shards can be rebalanced at the same time instead of one-by-one. +That has the advantage that the rebalance process can be quicker but has the drawback that the +the number of shards (and therefore load) between different nodes may be significantly different. + +### ShardCoordinator state + +The state of shard locations in the `ShardCoordinator` is persistent (durable) with +@ref:[Distributed Data](distributed-data.md) (or see @ref:[Classic Distributed Data](../distributed-data.md))to survive failures. + +When a crashed or +unreachable coordinator node has been removed (via down) from the cluster a new `ShardCoordinator` singleton +actor will take over and the state is recovered. During such a failure period shards +with a known location are still available, while messages for new (unknown) shards +are buffered until the new `ShardCoordinator` becomes available. + +### Message ordering + +As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity +actor the order of the messages is preserved. As long as the buffer limit is not reached +messages are delivered on a best effort basis, with at-most once delivery semantics, +in the same way as ordinary message sending. + +#### AtLeastOnceDelivery + +Reliable end-to-end messaging, with at-least-once semantics can be added by using +`AtLeastOnceDelivery` with @ref:[Classic Persistence](../persistence.md#at-least-once-delivery), +and see @github[#20984](#20984) AtLeastOnceDelivery, including redelivery with a backoff. + +### Overhead + +Some additional latency is introduced for messages targeted to new or previously +unused shards due to the round-trip to the coordinator. Rebalancing of shards may +also add latency. This should be considered when designing the application specific +shard resolution, e.g. to avoid too fine grained shards. Once a shard's location is known +the only overhead is sending a message via the `ShardRegion` rather than directly. diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 44d0096db6..d40f58bc0f 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -1,8 +1,14 @@ # Cluster Sharding +@@@ note + +For the Akka Classic API see @ref:[Classic Cluster Sharding](../cluster-sharding.md) + +@@@ + ## Dependency -To use Akka Cluster Sharding Typed, you must add the following dependency in your project: +To use Akka Cluster Sharding, you must add the following dependency in your project: @@dependency[sbt,Maven,Gradle] { group=com.typesafe.akka @@ -12,8 +18,36 @@ To use Akka Cluster Sharding Typed, you must add the following dependency in you ## Introduction -For an introduction to Sharding concepts see @ref:[Cluster Sharding](../cluster-sharding.md). This documentation shows how to use the typed -Cluster Sharding API. +Cluster sharding is useful when you need to distribute actors across several nodes in the cluster and want to +be able to interact with them using their logical identifier, but without having to care about +their physical location in the cluster, which might also change over time. + +It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. +Here we call these actors "entities". These actors typically have persistent (durable) state, +but this feature is not limited to actors with persistent state. + +Cluster sharding is typically used when you have many stateful actors that together consume +more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors +it might be easier to run them on a @ref:[Cluster Singleton](cluster-singleton.md) node. + +In this context sharding means that actors with an identifier, so called entities, +can be automatically distributed across multiple nodes in the cluster. Each entity +actor runs only at one place, and messages can be sent to the entity without requiring +the sender to know the location of the destination actor. This is achieved by sending +the messages via a `ShardRegion` actor provided by this extension, which knows how +to route the message with the entity id to the final destination. + +Cluster sharding will not be active on members with status @ref:[WeaklyUp](cluster-membership.md#weaklyup-members) +if that feature is enabled. + +@@@ warning + +**Don't use Cluster Sharding together with Automatic Downing**, +since it allows the cluster to split up into two separate clusters, which in turn will result +in *multiple shards and entities* being started, one in each separate cluster! +See @ref:[Downing](cluster.md#automatic-vs-manual-downing). + +@@@ ## Basic example @@ -87,6 +121,82 @@ is used but `tell` or any of the other @ref:[Interaction Patterns](interaction-p See @ref:[persistence](persistence.md) for more details. +## Shard allocation strategy + +The default implementation `akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy` +allocates new shards to the `ShardRegion` with least number of previously allocated shards. +This strategy can be replaced by an application specific implementation. + +An optional custom shard allocation strategy can be passed into the optional parameter when initializing an entity type +or explicitly using the `withAllocationStrategy` function. +See the API documentation of @scala[`akka.cluster.sharding.ShardAllocationStrategy`]@java[`akka.cluster.sharding.AbstractShardAllocationStrategy`] for details +of how to implement a custom `ShardAllocationStrategy`. + +## How it works + +See @ref:[Cluster Sharding concepts](cluster-sharding-concepts.md). + +## Sharding State Store Mode + +There are two cluster sharding states managed: + +1. @ref:[ShardCoordinator State](cluster-sharding-concepts.md#shardcoordinator-state) - the `Shard` locations +1. @ref:[Remembering Entities](#remembering-entities) - the entities in each `Shard`, which is optional, and disabled by default + +For these, there are currently two modes which define how these states are stored: + +* @ref:[Distributed Data Mode](#distributed-data-mode) - uses Akka @ref:[Distributed Data](distributed-data.md) (CRDTs) (the default) +* @ref:[Persistence Mode](#persistence-mode) - (deprecated) uses Akka @ref:[Persistence](persistence.md) (Event Sourcing) + +@@include[cluster.md](../includes/cluster.md) { #sharding-persistence-mode-deprecated } + +Changing the mode requires @ref:[a full cluster restart](../additional/rolling-updates.md#cluster-sharding-configuration-change). + +### Distributed Data Mode + +This mode is enabled with configuration (enabled by default): + +``` +akka.cluster.sharding.state-store-mode = ddata +``` + +The state of the `ShardCoordinator` is replicated across the cluster but is not durable, not stored to disk. +The `ShardCoordinator` state replication is handled by @ref:[Distributed Data](distributed-data.md) with `WriteMajority`/`ReadMajority` consistency. +When all nodes in the cluster have been stopped, the state is no longer needed and dropped. + +The state of @ref:[Remembering Entities](#remembering-entities) is durable and stored to +disk. This means remembered entities are restarted even after a complete (non-rolling) cluster restart when the disk is still available. + +Cluster Sharding uses its own Distributed Data `Replicator` per node. +If using roles with sharding there is one `Replicator` per role, which enables a subset of +all nodes for some entity types and another subset for other entity types. Each such replicator has a name +that contains the node role and therefore the role configuration must be the same on all nodes in the +cluster, for example you can't change the roles when performing a rolling upgrade. +Changing roles requires @ref:[a full cluster restart](../additional/rolling-updates.md#cluster-sharding-configuration-change). + +The settings for Distributed Data are configured in the section +`akka.cluster.sharding.distributed-data`. It's not possible to have different +`distributed-data` settings for different sharding entity types. + +### Persistence Mode + +This mode is enabled with configuration: + +``` +akka.cluster.sharding.state-store-mode = persistence +``` + +Since it is running in a cluster @ref:[Persistence](persistence.md) must be configured with a distributed journal. + +@@@ note + +Persistence mode for @ref:[Remembering Entities](#remembering-entities) will be replaced by a pluggable data access API with storage implementations, +see @github[#27763](#27763). +New sharding applications should no longer choose persistence mode. Existing users of persistence mode +[can eventually migrate to the replacement options](https://github.com/akka/akka/issues/26177). + +@@@ + ## Passivation If the state of the entities are persistent you may stop entities that are not used to @@ -94,7 +204,7 @@ reduce memory consumption. This is done by the application specific implementati the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). If a message is already enqueued to the entity when it stops itself the enqueued message in the mailbox will be dropped. To support graceful passivation without losing such -messages the entity actor can send `ClusterSharding.Passivate` to to the +messages the entity actor can send `ClusterSharding.Passivate` to the @scala[`ActorRef[ShardCommand]`]@java[`ActorRef`] that was passed in to the factory method when creating the entity. The optional `stopMessage` message will be sent back to the entity, which is then supposed to stop itself, otherwise it will @@ -125,10 +235,129 @@ message if the entity needs to perform some asynchronous cleanup or interactions The entities are automatically passivated if they haven't received a message within the duration configured in `akka.cluster.sharding.passivate-idle-entity-after` -or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable +or by explicitly setting the `passivateIdleEntityAfter` flag on `ClusterShardingSettings` to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the `ActorRef` or messages that the actor sends to itself are not counted in this activity. Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`. -It is always disabled if @ref:[Remembering Entities](../cluster-sharding.md#remembering-entities) is enabled. +It is disabled automatically if @ref:[Remembering Entities](#remembering-entities) is enabled. +## Remembering Entities +Remembering entities pertains to restarting entities after a rebalance or recovering from a crash. +Enabling or disabling (the default) this feature drives the behavior of the restarts: + +* enabled: entities are restarted, even though no new messages are sent to them. This will also automatically disable @ref:[Passivation](#passivation). +* disabled: entities are restarted, on demand when a new message arrives. + +Note that the state of the entities themselves will not be restored unless they have been made persistent, +for example with @ref:[Event Sourcing](persistence.md). + +To make the list of entities in each `Shard` persistent (durable): + +1. set the `rememberEntities` flag to true in `ClusterShardingSettings` when +starting a shard region (or its proxy) for a given `entity` type, +or configure `akka.cluster.sharding.remember-entities = on`, +1. make sure the it is possible to extract the ID of the sharded entity in the message. + +The performance cost of `rememberEntities` is rather high when starting/stopping entities and when +shards are rebalanced. This cost increases with number of entities per shard, thus it is not +recommended with more than 10000 active entities per shard. + +### Behavior When Enabled + +When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto another +node or recovers after a crash it will recreate all the entities which were previously +running in that `Shard`. To permanently stop entities, a `Passivate` message must be +sent to the parent of the entity actor, otherwise the entity will be automatically +restarted after the entity restart backoff specified in the configuration. + +When [Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are +stored in @ref:[Durable Storage](distributed-data.md#durable-storage) of Distributed Data. You may want to change the +configuration of the `akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since +the default directory contains the remote port of the actor system. If using a dynamically +assigned port (0) it will be different each time and the previously stored data will not +be loaded. + +The reason for storing the identifiers of the active entities in durable storage, i.e. stored to +disk, is that the same entities should be started also after a complete cluster restart. If this is not needed +you can disable durable storage and benefit from better performance by using the following configuration: + +``` +akka.cluster.sharding.distributed-data.durable.keys = [] +``` + +### Behavior When Not Enabled + +When `rememberEntities` is disabled (the default), a `Shard` will not automatically restart any entities +after a rebalance or recovering from a crash. Instead, entities are started once the first message +for that entity has been received in the `Shard`. + +### Startup after minimum number of members + +It's recommended to use Cluster Sharding with the Cluster setting `akka.cluster.min-nr-of-members` or +`akka.cluster.role..min-nr-of-members`. `min-nr-of-members` will defer the allocation of the shards +until at least that number of regions have been started and registered to the coordinator. This +avoids that many shards are allocated to the first region that registers and only later are +rebalanced to other nodes. + +See @ref:[How To Startup when Cluster Size Reached](cluster.md#how-to-startup-when-a-cluster-size-is-reached) +for more information about `min-nr-of-members`. + +## Removal of internal Cluster Sharding data + +Removal of internal Cluster Sharding data is only relevant for "Persistent Mode". +The Cluster Sharding `ShardCoordinator` stores locations of the shards. +This data is safely be removed when restarting the whole Akka Cluster. +Note that this does not include application data. + +There is a utility program `akka.cluster.sharding.RemoveInternalClusterShardingData` +that removes this data. + +@@@ warning + +Never use this program while there are running Akka Cluster nodes that are +using Cluster Sharding. Stop all Cluster nodes before using this program. + +@@@ + +It can be needed to remove the data if the Cluster Sharding coordinator +cannot startup because of corrupt data, which may happen if accidentally +two clusters were running at the same time, e.g. caused by using auto-down +and there was a network partition. + +@@@ warning + +**Don't use Cluster Sharding together with Automatic Downing**, +since it allows the cluster to split up into two separate clusters, which in turn will result +in *multiple shards and entities* being started, one in each separate cluster! +See @ref:[Downing](cluster.md#automatic-vs-manual-downing). + +@@@ + +Use this program as a standalone Java main program: + +``` +java -classpath + akka.cluster.sharding.RemoveInternalClusterShardingData + -2.3 entityType1 entityType2 entityType3 +``` + +The program is included in the `akka-cluster-sharding` jar file. It +is easiest to run it with same classpath and configuration as your ordinary +application. It can be run from sbt or Maven in similar way. + +Specify the entity type names (same as you use in the `start` method +of `ClusterSharding`) as program arguments. + +If you specify `-2.3` as the first program argument it will also try +to remove data that was stored by Cluster Sharding in Akka 2.3.x using +different persistenceId. + +## Configuration + +The `ClusterSharding` extension can be configured with the following properties. These configuration +properties are read by the `ClusterShardingSettings` when created with an ActorSystem parameter. +It is also possible to amend the `ClusterShardingSettings` or create it from another config section +with the same layout as below. + +@@snip [reference.conf](/akka-cluster-sharding/src/main/resources/reference.conf) { #sharding-ext-config } diff --git a/akka-docs/src/main/paradox/typed/cluster.md b/akka-docs/src/main/paradox/typed/cluster.md index ceb1aa160a..4281a99156 100644 --- a/akka-docs/src/main/paradox/typed/cluster.md +++ b/akka-docs/src/main/paradox/typed/cluster.md @@ -3,7 +3,7 @@ This document describes how to use Akka Cluster and the Cluster APIs. For specific documentation topics see: -* @ref:[Cluster Specification](cluster-specification.md) +* @ref:[Cluster Specification](cluster-concepts.md) * @ref:[Cluster Membership Service](cluster-membership.md) * @ref:[When and where to use Akka Cluster](choosing-cluster.md) * @ref:[Higher level Cluster tools](#higher-level-cluster-tools) @@ -328,7 +328,7 @@ The roles are part of the membership information in `MemberEvent` that you can s The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. Please see: -* @ref:[Failure Detector specification](cluster-specification.md#failure-detector) +* @ref:[Failure Detector specification](cluster-concepts.md#failure-detector) * @ref:[Phi Accrual Failure Detector](failure-detector.md) implementation * [Using the Failure Detector](#using-the-failure-detector) @@ -361,6 +361,28 @@ There are several configuration properties for the cluster. Refer to the @ref:[reference configuration](../general/configuration.md#config-akka-cluster) for full configuration descriptions, default values and options. +### How To Startup when a Cluster size is reached + +A common use case is to start actors after the cluster has been initialized, +members have joined, and the cluster has reached a certain size. + +With a configuration option you can define required number of members +before the leader changes member status of 'Joining' members to 'Up'.: + +``` +akka.cluster.min-nr-of-members = 3 +``` + +In a similar way you can define required number of members of a certain role +before the leader changes member status of 'Joining' members to 'Up'.: + +``` +akka.cluster.role { + frontend.min-nr-of-members = 1 + backend.min-nr-of-members = 2 +} +``` + ### Cluster Info Logging You can silence the logging of cluster events at info level with configuration property: diff --git a/akka-docs/src/main/paradox/typed/distributed-data.md b/akka-docs/src/main/paradox/typed/distributed-data.md index f5a020502b..a5918d9350 100644 --- a/akka-docs/src/main/paradox/typed/distributed-data.md +++ b/akka-docs/src/main/paradox/typed/distributed-data.md @@ -111,3 +111,87 @@ actor ref. All such `Replicator`s must run on the same path in the classic actor A standalone `ReplicatorMessageAdapter` can also be created for a given `Replicator` instead of creating one via the `DistributedData` extension. +## Durable Storage + +By default the data is only kept in memory. It is redundant since it is replicated to other nodes +in the cluster, but if you stop all nodes the data is lost, unless you have saved it +elsewhere. + +Entries can be configured to be durable, i.e. stored on local disk on each node. The stored data will be loaded +next time the replicator is started, i.e. when actor system is restarted. This means data will survive as +long as at least one node from the old cluster takes part in a new cluster. The keys of the durable entries +are configured with: + +``` +akka.cluster.distributed-data.durable.keys = ["a", "b", "durable*"] +``` + +Prefix matching is supported by using `*` at the end of a key. + +All entries can be made durable by specifying: + +``` +akka.cluster.distributed-data.durable.keys = ["*"] +``` + +@scala[[LMDB](https://symas.com/products/lightning-memory-mapped-database/)]@java[[LMDB](https://github.com/lmdbjava/lmdbjava/)] is the default storage implementation. It is +possible to replace that with another implementation by implementing the actor protocol described in +`akka.cluster.ddata.DurableStore` and defining the `akka.cluster.distributed-data.durable.store-actor-class` +property for the new implementation. + +The location of the files for the data is configured with: + +Scala +: ``` +# Directory of LMDB file. There are two options: +# 1. A relative or absolute path to a directory that ends with 'ddata' +# the full name of the directory will contain name of the ActorSystem +# and its remote port. +# 2. Otherwise the path is used as is, as a relative or absolute path to +# a directory. +akka.cluster.distributed-data.durable.lmdb.dir = "ddata" +``` + +Java +: ``` +# Directory of LMDB file. There are two options: +# 1. A relative or absolute path to a directory that ends with 'ddata' +# the full name of the directory will contain name of the ActorSystem +# and its remote port. +# 2. Otherwise the path is used as is, as a relative or absolute path to +# a directory. +akka.cluster.distributed-data.durable.lmdb.dir = "ddata" +``` + + +When running in production you may want to configure the directory to a specific +path (alt 2), since the default directory contains the remote port of the +actor system to make the name unique. If using a dynamically assigned +port (0) it will be different each time and the previously stored data +will not be loaded. + +Making the data durable has a performance cost. By default, each update is flushed +to disk before the `UpdateSuccess` reply is sent. For better performance, but with the risk of losing +the last writes if the JVM crashes, you can enable write behind mode. Changes are then accumulated during +a time period before it is written to LMDB and flushed to disk. Enabling write behind is especially +efficient when performing many writes to the same key, because it is only the last value for each key +that will be serialized and stored. The risk of losing writes if the JVM crashes is small since the +data is typically replicated to other nodes immediately according to the given `WriteConsistency`. + +``` +akka.cluster.distributed-data.durable.lmdb.write-behind-interval = 200 ms +``` + +Note that you should be prepared to receive `WriteFailure` as reply to an `Update` of a +durable entry if the data could not be stored for some reason. When enabling `write-behind-interval` +such errors will only be logged and `UpdateSuccess` will still be the reply to the `Update`. + +There is one important caveat when it comes pruning of [CRDT Garbage](#crdt-garbage) for durable data. +If an old data entry that was never pruned is injected and merged with existing data after +that the pruning markers have been removed the value will not be correct. The time-to-live +of the markers is defined by configuration +`akka.cluster.distributed-data.durable.remove-pruning-marker-after` and is in the magnitude of days. +This would be possible if a node with durable data didn't participate in the pruning +(e.g. it was shutdown) and later started after this time. A node with durable data should not +be stopped for longer time than this duration and if it is joining again after this +duration its data should first be manually removed (from the lmdb directory). \ No newline at end of file diff --git a/akka-docs/src/main/paradox/typed/index-cluster.md b/akka-docs/src/main/paradox/typed/index-cluster.md index 72fc0d8303..78721b4f5e 100644 --- a/akka-docs/src/main/paradox/typed/index-cluster.md +++ b/akka-docs/src/main/paradox/typed/index-cluster.md @@ -5,11 +5,12 @@ @@@ index * [cluster](cluster.md) -* [cluster-specification](cluster-specification.md) +* [cluster-specification](cluster-concepts.md) * [cluster-membership](cluster-membership.md) * [distributed-data](distributed-data.md) * [cluster-singleton](cluster-singleton.md) * [cluster-sharding](cluster-sharding.md) +* [cluster-sharding-specification](cluster-sharding-concepts.md) * [serialization](../serialization.md) * [serialization-jackson](../serialization-jackson.md) * [multi-jvm-testing](../multi-jvm-testing.md)