Use ddata mode as the default for Cluster Sharding, #17963

This commit is contained in:
Patrik Nordwall 2017-01-24 18:49:01 +01:00
parent 94e40460a4
commit 9616e997fc
7 changed files with 145 additions and 70 deletions

View file

@ -58,22 +58,24 @@ akka.cluster.sharding {
# used for the internal persistence of ClusterSharding. If not defined
# the default journal plugin is used. Note that this is not related to
# persistence used by the entity actors.
# Only used when state-store-mode=persistence
journal-plugin-id = ""
# Absolute path to the snapshot plugin configuration entity that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default snapshot plugin is used. Note that this is not related to
# persistence used by the entity actors.
# Only used when state-store-mode=persistence
snapshot-plugin-id = ""
# Parameter which determines how the coordinator will be store a state
# valid values either "persistence" or "ddata"
# The "ddata" mode is experimental, since it depends on the experimental
# module akka-distributed-data-experimental.
state-store-mode = "persistence"
# Defines how the coordinator stores its state. Same is also used by the
# shards for rememberEntities.
# Valid values are "ddata" or "persistence".
state-store-mode = "ddata"
# The shard saves persistent snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# Only used when state-store-mode=persistence
snapshot-after = 1000
# Setting for the default shard allocation strategy
@ -87,11 +89,11 @@ akka.cluster.sharding {
}
# Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened)
# works only for state-store-mode = "ddata"
# Only used when state-store-mode=ddata
waiting-for-state-timeout = 5 s
# Timeout of waiting for update the distributed state (update will be retried if the timeout happened)
# works only for state-store-mode = "ddata"
# Only used when state-store-mode=ddata
updating-state-timeout = 5 s
# The shard uses this strategy to determines how to recover the underlying entity actors. The strategy is only used
@ -113,12 +115,13 @@ akka.cluster.sharding {
# be the same as "akka.cluster.sharding.role".
coordinator-singleton = ${akka.cluster.singleton}
# Settings for the Distributed Data replicator. Used when state-store-mode=ddata.
# Settings for the Distributed Data replicator.
# Same layout as akka.cluster.distributed-data.
# The "role" of the distributed-data configuration is not used. The distributed-data
# role will be the same as "akka.cluster.sharding.role".
# Note that there is one Replicator per role and it's not possible
# to have different distributed-data settings for different sharding entity types.
# Only used when state-store-mode=ddata
distributed-data = ${akka.cluster.distributed-data}
distributed-data {
durable.keys = ["shard-*"]

View file

@ -38,6 +38,7 @@ object RemoveInternalClusterShardingDataSpec {
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec"
akka.cluster.sharding.snapshot-after = 5
akka.cluster.sharding.state-store-mode = persistence
"""
val extractEntityId: ShardRegion.ExtractEntityId = {

View file

@ -37,6 +37,12 @@ Akka is very modular and consists of several JARs containing different features.
- ``akka-cluster`` Cluster membership management, elastic routers.
- ``akka-cluster-sharding`` Cluster Sharding of actors.
- ``akka-cluster-tools`` Additoinal Cluster utilities, such as Singleton, Pub/Sub and Client.
- ``akka-distributed-data`` Cluster data with CRDTs.
- ``akka-osgi`` Utilities for using Akka in OSGi containers
- ``akka-osgi-aries`` Aries blueprint for provisioning actor systems

View file

@ -38,7 +38,7 @@ This is how an entity actor may look like:
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-actor
The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state.
The above actor uses event sourcing and the support provided in ``AbstractPersistentActor`` to store its state.
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
its state if it is valuable.
@ -167,9 +167,8 @@ must be to begin the rebalancing. This strategy can be replaced by an applicatio
implementation.
The state of shard locations in the ``ShardCoordinator`` is persistent (durable) with
:ref:`persistence-java` to survive failures. Since it is running in a cluster :ref:`persistence-java`
must be configured with a distributed journal. When a crashed or unreachable coordinator
node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton
:ref:`distributed_data-java` or :ref:`persistence-java` to survive failures. When a crashed or
unreachable coordinator node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton
actor will take over and the state is recovered. During such a failure period shards
with known location are still available, while messages for new (unknown) shards
are buffered until the new ``ShardCoordinator`` becomes available.
@ -185,21 +184,39 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma
also add latency. This should be considered when designing the application specific
shard resolution, e.g. to avoid too fine grained shards.
.. _cluster_sharding_ddata_java:
.. _cluster_sharding_mode_java:
Distributed Data vs. Persistence Mode
-------------------------------------
The state of the coordinator and the state of :ref:`cluster_sharding_remembering_java` of the shards
are persistent (durable) to survive failures. :ref:`distributed_data_java` or :ref:`persistence-java`
can be used for the storage. Distributed Data is used by default.
The functionality when using the two modes is the same. If your sharded entities are not using Akka Persistence
themselves it is more convenient to use the Distributed Data mode, since then you don't have to
setup and operate a separate data store (e.g. Cassandra) for persistence. Aside from that, there are
no major reasons for using one mode over the the other.
It's important to use the same mode on all nodes in the cluster, i.e. it's not possible to perform
a rolling upgrade to change this setting.
Distributed Data Mode
---------------------
^^^^^^^^^^^^^^^^^^^^^
Instead of using :ref:`persistence-java` it is possible to use the :ref:`distributed_data_java` module
as storage for the state of the sharding coordinator. In such case the state of the
``ShardCoordinator`` will be replicated inside a cluster by the :ref:`distributed_data_java` module with
``WriteMajority``/``ReadMajority`` consistency.
This mode can be enabled by setting configuration property::
This mode is enabled with configuration (enabled by default)::
akka.cluster.sharding.state-store-mode = ddata
It is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of
The state of the ``ShardCoordinator`` will be replicated inside a cluster by the
:ref:`distributed_data_java` module with ``WriteMajority``/``ReadMajority`` consistency.
The state of the coordinator is not durable, it's not stored to disk. When all nodes in
the cluster have been stopped the state is lost and not needed any more.
The state of :ref:`cluster_sharding_remembering_java` is also durable, i.e. it is stored to
disk. The stored entities are started also after a complete cluster restart.
Cluster Sharding is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of
all nodes for some entity types and another subset for other entity types. Each such replicator has a name
that contains the node role and therefore the role configuration must be the same on all nodes in the
cluster, i.e. you can't change the roles when performing a rolling upgrade.
@ -208,14 +225,18 @@ The settings for Distributed Data is configured in the the section
``akka.cluster.sharding.distributed-data``. It's not possible to have different
``distributed-data`` settings for different sharding entity types.
You must explicitly add the ``akka-distributed-data`` dependency to your build if
you use this mode. It is possible to remove ``akka-persistence`` dependency from a project if it
is not used in user code.
Persistence Mode
^^^^^^^^^^^^^^^^
.. warning::
This mode is enabled with configuration::
akka.cluster.sharding.state-store-mode = persistence
Since it is running in a cluster :ref:`persistence-java` must be configured with a distributed journal.
You must explicitly add the ``akka-persistence`` dependency to your build if
you use this mode.
The ``ddata`` mode is considered as **“experimental”** as of its introduction in Akka 2.4.0, since
it depends on the experimental Distributed Data module.
Startup after minimum number of members
---------------------------------------
@ -250,6 +271,8 @@ then supposed to stop itself. Incoming messages will be buffered by the ``Shard`
between reception of ``Passivate`` and termination of the entity. Such buffered messages
are thereafter delivered to a new incarnation of the entity.
.. _cluster_sharding_remembering_java:
Remembering Entities
--------------------
@ -262,7 +285,7 @@ a ``Passivate`` message must be sent to the parent of the entity actor, otherwis
entity will be automatically restarted after the entity restart backoff specified in
the configuration.
When :ref:`cluster_sharding_ddata_java` is used the identifiers of the entities are
When :ref:`Distributed Data mode <cluster_sharding_mode_java>` is used the identifiers of the entities are
stored in :ref:`ddata_durable_java` of Distributed Data. You may want to change the
configuration of the akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since
the default directory contains the remote port of the actor system. If using a dynamically

View file

@ -324,6 +324,23 @@ WeaklyUp
You should not run a cluster with this feature enabled on some nodes and disabled on some. Therefore
you might need to enable/disable it in configuration when performing rolling upgrade from 2.4.x to 2.5.0.
Cluster Sharding state-store-mode
---------------------------------
Distributed Data mode is now the default ``state-store-mode`` for Cluster Sharding. The persistence mode
is also supported. Read more in the documentation :ref:`for Scala <cluster_sharding_mode_scala>` or
the documentation :ref:`for Java <cluster_sharding_mode_java>`.
It's important to use the same mode on all nodes in the cluster, i.e. if you perform a rolling upgrade
from 2.4.16 you might need to change the ``state-store-mode`` to be the same (``persistence`` is default
in 2.4.x)::
akka.cluster.sharding.state-store-mode = persistence
Note that the stored :ref:`cluster_sharding_remembering_java` data with ``persistence`` mode cannot
be migrated to the ``data`` mode. Such entities must be started again in some other way when using
``ddata`` mode.
Cluster Management Command Line Tool
------------------------------------
@ -336,6 +353,28 @@ See documentation of `akka/akka-cluster-management <https://github.com/akka/akka
The command line script for cluster management has been deprecated and is scheduled for removal
in the next major version. Use the HTTP API with `curl <https://curl.haxx.se/>`_ or similar instead.
Distributed Data
================
Map allow generic type for the keys
-----------------------------------
In 2.4 the key of any Distributed Data map always needed to be of type String. In 2.5 you can use any type for the key. This means that
every map (ORMap, LWWMap, PNCounterMap, ORMultiMap) now takes an extra type parameter to specify the key type. To migrate
existing code from 2.4 to 2.5 you simple add String as key type, for example: `ORMultiMap[Foo]` becomes `ORMultiMap[String, Foo]`.
`PNCounterMap` didn't take a type parameter in version 2.4, so `PNCounterMap` in 2.4 becomes `PNCounterMap[String]` in 2.5.
Java developers should use `<>` instead of `[]`, e.g: `PNCounterMap<String>`.
**NOTE: Even though the interface is not compatible between 2.4 and 2.5, the binary protocol over the wire is (as long
as you use String as key type). This means that 2.4 nodes can synchronize with 2.5 nodes.**
Subscribers
-----------
When an entity is removed subscribers will not receive ``Replicator.DataDeleted`` any more.
They will receive ``Replicator.Deleted`` instead.
Persistence
===========
@ -409,23 +448,3 @@ We also anticipate to replace the uses of Agents by the upcoming Akka Typed, so
If you use Agents and would like to take over the maintanance thereof, please contact the team on gitter or github.
Distributed Data
================
Map allow generic type for the keys
-----------------------------------
In 2.4 the key of any Distributed Data map always needed to be of type String. In 2.5 you can use any type for the key. This means that
every map (ORMap, LWWMap, PNCounterMap, ORMultiMap) now takes an extra type parameter to specify the key type. To migrate
existing code from 2.4 to 2.5 you simple add String as key type, for example: `ORMultiMap[Foo]` becomes `ORMultiMap[String, Foo]`.
`PNCounterMap` didn't take a type parameter in version 2.4, so `PNCounterMap` in 2.4 becomes `PNCounterMap[String]` in 2.5.
Java developers should use `<>` instead of `[]`, e.g: `PNCounterMap<String>`.
**NOTE: Even though the interface is not compatible between 2.4 and 2.5, the binary protocol over the wire is (as long
as you use String as key type). This means that 2.4 nodes can synchronize with 2.5 nodes.**
Subscribers
-----------
When an entity is removed subscribers will not receive ``Replicator.DataDeleted`` any more.
They will receive ``Replicator.Deleted`` instead.

View file

@ -170,9 +170,8 @@ must be to begin the rebalancing. This strategy can be replaced by an applicatio
implementation.
The state of shard locations in the ``ShardCoordinator`` is persistent (durable) with
:ref:`persistence-scala` to survive failures. Since it is running in a cluster :ref:`persistence-scala`
must be configured with a distributed journal. When a crashed or unreachable coordinator
node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton
:ref:`distributed_data_scala` or :ref:`persistence-scala` to survive failures. When a crashed or
unreachable coordinator node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton
actor will take over and the state is recovered. During such a failure period shards
with known location are still available, while messages for new (unknown) shards
are buffered until the new ``ShardCoordinator`` becomes available.
@ -188,21 +187,39 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma
also add latency. This should be considered when designing the application specific
shard resolution, e.g. to avoid too fine grained shards.
.. _cluster_sharding_ddata_scala:
.. _cluster_sharding_mode_scala:
Distributed Data vs. Persistence Mode
-------------------------------------
The state of the coordinator and the state of :ref:`cluster_sharding_remembering_scala` of the shards
are persistent (durable) to survive failures. :ref:`distributed_data_scala` or :ref:`persistence-scala`
can be used for the storage. Distributed Data is used by default.
The functionality when using the two modes is the same. If your sharded entities are not using Akka Persistence
themselves it is more convenient to use the Distributed Data mode, since then you don't have to
setup and operate a separate data store (e.g. Cassandra) for persistence. Aside from that, there are
no major reasons for using one mode over the the other.
It's important to use the same mode on all nodes in the cluster, i.e. it's not possible to perform
a rolling upgrade to change this setting.
Distributed Data Mode
---------------------
^^^^^^^^^^^^^^^^^^^^^
Instead of using :ref:`persistence-scala` it is possible to use the :ref:`distributed_data_scala` module
as storage for the state of the sharding coordinator. In such case the state of the
``ShardCoordinator`` will be replicated inside a cluster by the :ref:`distributed_data_scala` module with
``WriteMajority``/``ReadMajority`` consistency.
This mode can be enabled by setting configuration property::
This mode is enabled with configuration (enabled by default)::
akka.cluster.sharding.state-store-mode = ddata
It is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of
The state of the ``ShardCoordinator`` will be replicated inside a cluster by the
:ref:`distributed_data_scala` module with ``WriteMajority``/``ReadMajority`` consistency.
The state of the coordinator is not durable, it's not stored to disk. When all nodes in
the cluster have been stopped the state is lost and not needed any more.
The state of :ref:`cluster_sharding_remembering_scala` is also durable, i.e. it is stored to
disk. The stored entities are started also after a complete cluster restart.
Cluster Sharding is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of
all nodes for some entity types and another subset for other entity types. Each such replicator has a name
that contains the node role and therefore the role configuration must be the same on all nodes in the
cluster, i.e. you can't change the roles when performing a rolling upgrade.
@ -211,14 +228,18 @@ The settings for Distributed Data is configured in the the section
``akka.cluster.sharding.distributed-data``. It's not possible to have different
``distributed-data`` settings for different sharding entity types.
You must explicitly add the ``akka-distributed-data`` dependency to your build if
you use this mode. It is possible to remove ``akka-persistence`` dependency from a project if it
is not used in user code.
Persistence Mode
^^^^^^^^^^^^^^^^
.. warning::
This mode is enabled with configuration::
akka.cluster.sharding.state-store-mode = persistence
Since it is running in a cluster :ref:`persistence-scala` must be configured with a distributed journal.
You must explicitly add the ``akka-persistence`` dependency to your build if
you use this mode.
The ``ddata`` mode is considered as **“experimental”** as of its introduction in Akka 2.4.0, since
it depends on the experimental Distributed Data module.
Startup after minimum number of members
---------------------------------------
@ -253,6 +274,8 @@ then supposed to stop itself. Incoming messages will be buffered by the ``Shard`
between reception of ``Passivate`` and termination of the entity. Such buffered messages
are thereafter delivered to a new incarnation of the entity.
.. _cluster_sharding_remembering_scala:
Remembering Entities
--------------------
@ -265,7 +288,7 @@ a ``Passivate`` message must be sent to the parent of the entity actor, otherwis
entity will be automatically restarted after the entity restart backoff specified in
the configuration.
When :ref:`cluster_sharding_ddata_scala` is used the identifiers of the entities are
When :ref:`Distributed Data mode <cluster_sharding_mode_scala>` is used the identifiers of the entities are
stored in :ref:`ddata_durable_scala` of Distributed Data. You may want to change the
configuration of the akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since
the default directory contains the remote port of the actor system. If using a dynamically

View file

@ -175,12 +175,12 @@ object AkkaBuild extends Build {
lazy val clusterSharding = Project(
id = "akka-cluster-sharding",
base = file("akka-cluster-sharding"),
// TODO akka-distributed-data dependency should be provided in pom.xml artifact.
// TODO akka-persistence dependency should be provided in pom.xml artifact.
// If I only use "provided" here it works, but then we can't run tests.
// Scope "test" is alright in the pom.xml, but would have been nicer with
// provided.
dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm",
persistence % "compile;test->provided", distributedData % "provided;test", clusterTools)
distributedData % "compile;test->provided", persistence % "provided;test", clusterTools)
).configs(MultiJvm)
lazy val distributedData = Project(