diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 6839d8e509..6960de0eb3 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -58,22 +58,24 @@ akka.cluster.sharding { # used for the internal persistence of ClusterSharding. If not defined # the default journal plugin is used. Note that this is not related to # persistence used by the entity actors. + # Only used when state-store-mode=persistence journal-plugin-id = "" # Absolute path to the snapshot plugin configuration entity that is to be # used for the internal persistence of ClusterSharding. If not defined # the default snapshot plugin is used. Note that this is not related to # persistence used by the entity actors. + # Only used when state-store-mode=persistence snapshot-plugin-id = "" - # Parameter which determines how the coordinator will be store a state - # valid values either "persistence" or "ddata" - # The "ddata" mode is experimental, since it depends on the experimental - # module akka-distributed-data-experimental. - state-store-mode = "persistence" + # Defines how the coordinator stores its state. Same is also used by the + # shards for rememberEntities. + # Valid values are "ddata" or "persistence". + state-store-mode = "ddata" # The shard saves persistent snapshots after this number of persistent # events. Snapshots are used to reduce recovery times. + # Only used when state-store-mode=persistence snapshot-after = 1000 # Setting for the default shard allocation strategy @@ -87,11 +89,11 @@ akka.cluster.sharding { } # Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened) - # works only for state-store-mode = "ddata" + # Only used when state-store-mode=ddata waiting-for-state-timeout = 5 s # Timeout of waiting for update the distributed state (update will be retried if the timeout happened) - # works only for state-store-mode = "ddata" + # Only used when state-store-mode=ddata updating-state-timeout = 5 s # The shard uses this strategy to determines how to recover the underlying entity actors. The strategy is only used @@ -113,12 +115,13 @@ akka.cluster.sharding { # be the same as "akka.cluster.sharding.role". coordinator-singleton = ${akka.cluster.singleton} - # Settings for the Distributed Data replicator. Used when state-store-mode=ddata. + # Settings for the Distributed Data replicator. # Same layout as akka.cluster.distributed-data. # The "role" of the distributed-data configuration is not used. The distributed-data # role will be the same as "akka.cluster.sharding.role". # Note that there is one Replicator per role and it's not possible # to have different distributed-data settings for different sharding entity types. + # Only used when state-store-mode=ddata distributed-data = ${akka.cluster.distributed-data} distributed-data { durable.keys = ["shard-*"] diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala index 2cfd35c2a6..2d9e246703 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala @@ -38,6 +38,7 @@ object RemoveInternalClusterShardingDataSpec { akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec" akka.cluster.sharding.snapshot-after = 5 + akka.cluster.sharding.state-store-mode = persistence """ val extractEntityId: ShardRegion.ExtractEntityId = { diff --git a/akka-docs/rst/intro/getting-started.rst b/akka-docs/rst/intro/getting-started.rst index 65d346a765..c4d93f7012 100644 --- a/akka-docs/rst/intro/getting-started.rst +++ b/akka-docs/rst/intro/getting-started.rst @@ -37,6 +37,12 @@ Akka is very modular and consists of several JARs containing different features. - ``akka-cluster`` – Cluster membership management, elastic routers. +- ``akka-cluster-sharding`` – Cluster Sharding of actors. + +- ``akka-cluster-tools`` – Additoinal Cluster utilities, such as Singleton, Pub/Sub and Client. + +- ``akka-distributed-data`` – Cluster data with CRDTs. + - ``akka-osgi`` – Utilities for using Akka in OSGi containers - ``akka-osgi-aries`` – Aries blueprint for provisioning actor systems diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index 89fc53b44d..d1d3d0cf99 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -38,7 +38,7 @@ This is how an entity actor may look like: .. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-actor -The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state. +The above actor uses event sourcing and the support provided in ``AbstractPersistentActor`` to store its state. It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover its state if it is valuable. @@ -167,9 +167,8 @@ must be to begin the rebalancing. This strategy can be replaced by an applicatio implementation. The state of shard locations in the ``ShardCoordinator`` is persistent (durable) with -:ref:`persistence-java` to survive failures. Since it is running in a cluster :ref:`persistence-java` -must be configured with a distributed journal. When a crashed or unreachable coordinator -node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton +:ref:`distributed_data-java` or :ref:`persistence-java` to survive failures. When a crashed or +unreachable coordinator node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton actor will take over and the state is recovered. During such a failure period shards with known location are still available, while messages for new (unknown) shards are buffered until the new ``ShardCoordinator`` becomes available. @@ -185,21 +184,39 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards. -.. _cluster_sharding_ddata_java: +.. _cluster_sharding_mode_java: + +Distributed Data vs. Persistence Mode +------------------------------------- + +The state of the coordinator and the state of :ref:`cluster_sharding_remembering_java` of the shards +are persistent (durable) to survive failures. :ref:`distributed_data_java` or :ref:`persistence-java` +can be used for the storage. Distributed Data is used by default. + +The functionality when using the two modes is the same. If your sharded entities are not using Akka Persistence +themselves it is more convenient to use the Distributed Data mode, since then you don't have to +setup and operate a separate data store (e.g. Cassandra) for persistence. Aside from that, there are +no major reasons for using one mode over the the other. + +It's important to use the same mode on all nodes in the cluster, i.e. it's not possible to perform +a rolling upgrade to change this setting. Distributed Data Mode ---------------------- +^^^^^^^^^^^^^^^^^^^^^ -Instead of using :ref:`persistence-java` it is possible to use the :ref:`distributed_data_java` module -as storage for the state of the sharding coordinator. In such case the state of the -``ShardCoordinator`` will be replicated inside a cluster by the :ref:`distributed_data_java` module with -``WriteMajority``/``ReadMajority`` consistency. +This mode is enabled with configuration (enabled by default):: -This mode can be enabled by setting configuration property:: + akka.cluster.sharding.state-store-mode = ddata - akka.cluster.sharding.state-store-mode = ddata +The state of the ``ShardCoordinator`` will be replicated inside a cluster by the +:ref:`distributed_data_java` module with ``WriteMajority``/``ReadMajority`` consistency. +The state of the coordinator is not durable, it's not stored to disk. When all nodes in +the cluster have been stopped the state is lost and not needed any more. -It is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of +The state of :ref:`cluster_sharding_remembering_java` is also durable, i.e. it is stored to +disk. The stored entities are started also after a complete cluster restart. + +Cluster Sharding is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of all nodes for some entity types and another subset for other entity types. Each such replicator has a name that contains the node role and therefore the role configuration must be the same on all nodes in the cluster, i.e. you can't change the roles when performing a rolling upgrade. @@ -208,14 +225,18 @@ The settings for Distributed Data is configured in the the section ``akka.cluster.sharding.distributed-data``. It's not possible to have different ``distributed-data`` settings for different sharding entity types. -You must explicitly add the ``akka-distributed-data`` dependency to your build if -you use this mode. It is possible to remove ``akka-persistence`` dependency from a project if it -is not used in user code. +Persistence Mode +^^^^^^^^^^^^^^^^ -.. warning:: +This mode is enabled with configuration:: + + akka.cluster.sharding.state-store-mode = persistence + +Since it is running in a cluster :ref:`persistence-java` must be configured with a distributed journal. + +You must explicitly add the ``akka-persistence`` dependency to your build if +you use this mode. - The ``ddata`` mode is considered as **“experimental”** as of its introduction in Akka 2.4.0, since - it depends on the experimental Distributed Data module. Startup after minimum number of members --------------------------------------- @@ -250,6 +271,8 @@ then supposed to stop itself. Incoming messages will be buffered by the ``Shard` between reception of ``Passivate`` and termination of the entity. Such buffered messages are thereafter delivered to a new incarnation of the entity. +.. _cluster_sharding_remembering_java: + Remembering Entities -------------------- @@ -262,7 +285,7 @@ a ``Passivate`` message must be sent to the parent of the entity actor, otherwis entity will be automatically restarted after the entity restart backoff specified in the configuration. -When :ref:`cluster_sharding_ddata_java` is used the identifiers of the entities are +When :ref:`Distributed Data mode ` is used the identifiers of the entities are stored in :ref:`ddata_durable_java` of Distributed Data. You may want to change the configuration of the akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since the default directory contains the remote port of the actor system. If using a dynamically diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 24834bae94..42e543e84a 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -322,7 +322,24 @@ WeaklyUp akka.cluster.allow-weakly-up-members = off You should not run a cluster with this feature enabled on some nodes and disabled on some. Therefore -you might need to enable/disable it in configuration when performing rolling upgrade from 2.4.x to 2.5.0. +you might need to enable/disable it in configuration when performing rolling upgrade from 2.4.x to 2.5.0. + +Cluster Sharding state-store-mode +--------------------------------- + +Distributed Data mode is now the default ``state-store-mode`` for Cluster Sharding. The persistence mode +is also supported. Read more in the documentation :ref:`for Scala ` or +the documentation :ref:`for Java `. + +It's important to use the same mode on all nodes in the cluster, i.e. if you perform a rolling upgrade +from 2.4.16 you might need to change the ``state-store-mode`` to be the same (``persistence`` is default +in 2.4.x):: + + akka.cluster.sharding.state-store-mode = persistence + +Note that the stored :ref:`cluster_sharding_remembering_java` data with ``persistence`` mode cannot +be migrated to the ``data`` mode. Such entities must be started again in some other way when using +``ddata`` mode. Cluster Management Command Line Tool ------------------------------------ @@ -336,6 +353,28 @@ See documentation of `akka/akka-cluster-management `_ or similar instead. +Distributed Data +================ + +Map allow generic type for the keys +----------------------------------- + +In 2.4 the key of any Distributed Data map always needed to be of type String. In 2.5 you can use any type for the key. This means that +every map (ORMap, LWWMap, PNCounterMap, ORMultiMap) now takes an extra type parameter to specify the key type. To migrate +existing code from 2.4 to 2.5 you simple add String as key type, for example: `ORMultiMap[Foo]` becomes `ORMultiMap[String, Foo]`. +`PNCounterMap` didn't take a type parameter in version 2.4, so `PNCounterMap` in 2.4 becomes `PNCounterMap[String]` in 2.5. +Java developers should use `<>` instead of `[]`, e.g: `PNCounterMap`. + +**NOTE: Even though the interface is not compatible between 2.4 and 2.5, the binary protocol over the wire is (as long +as you use String as key type). This means that 2.4 nodes can synchronize with 2.5 nodes.** + +Subscribers +----------- + +When an entity is removed subscribers will not receive ``Replicator.DataDeleted`` any more. +They will receive ``Replicator.Deleted`` instead. + + Persistence =========== @@ -409,23 +448,3 @@ We also anticipate to replace the uses of Agents by the upcoming Akka Typed, so If you use Agents and would like to take over the maintanance thereof, please contact the team on gitter or github. -Distributed Data -================ - -Map allow generic type for the keys ------------------------------------ - -In 2.4 the key of any Distributed Data map always needed to be of type String. In 2.5 you can use any type for the key. This means that -every map (ORMap, LWWMap, PNCounterMap, ORMultiMap) now takes an extra type parameter to specify the key type. To migrate -existing code from 2.4 to 2.5 you simple add String as key type, for example: `ORMultiMap[Foo]` becomes `ORMultiMap[String, Foo]`. -`PNCounterMap` didn't take a type parameter in version 2.4, so `PNCounterMap` in 2.4 becomes `PNCounterMap[String]` in 2.5. -Java developers should use `<>` instead of `[]`, e.g: `PNCounterMap`. - -**NOTE: Even though the interface is not compatible between 2.4 and 2.5, the binary protocol over the wire is (as long -as you use String as key type). This means that 2.4 nodes can synchronize with 2.5 nodes.** - -Subscribers ------------ - -When an entity is removed subscribers will not receive ``Replicator.DataDeleted`` any more. -They will receive ``Replicator.Deleted`` instead. diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 4ab89deaf9..2e2cccc94e 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -170,9 +170,8 @@ must be to begin the rebalancing. This strategy can be replaced by an applicatio implementation. The state of shard locations in the ``ShardCoordinator`` is persistent (durable) with -:ref:`persistence-scala` to survive failures. Since it is running in a cluster :ref:`persistence-scala` -must be configured with a distributed journal. When a crashed or unreachable coordinator -node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton +:ref:`distributed_data_scala` or :ref:`persistence-scala` to survive failures. When a crashed or +unreachable coordinator node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton actor will take over and the state is recovered. During such a failure period shards with known location are still available, while messages for new (unknown) shards are buffered until the new ``ShardCoordinator`` becomes available. @@ -188,21 +187,39 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards. -.. _cluster_sharding_ddata_scala: +.. _cluster_sharding_mode_scala: + +Distributed Data vs. Persistence Mode +------------------------------------- + +The state of the coordinator and the state of :ref:`cluster_sharding_remembering_scala` of the shards +are persistent (durable) to survive failures. :ref:`distributed_data_scala` or :ref:`persistence-scala` +can be used for the storage. Distributed Data is used by default. + +The functionality when using the two modes is the same. If your sharded entities are not using Akka Persistence +themselves it is more convenient to use the Distributed Data mode, since then you don't have to +setup and operate a separate data store (e.g. Cassandra) for persistence. Aside from that, there are +no major reasons for using one mode over the the other. + +It's important to use the same mode on all nodes in the cluster, i.e. it's not possible to perform +a rolling upgrade to change this setting. Distributed Data Mode ---------------------- +^^^^^^^^^^^^^^^^^^^^^ -Instead of using :ref:`persistence-scala` it is possible to use the :ref:`distributed_data_scala` module -as storage for the state of the sharding coordinator. In such case the state of the -``ShardCoordinator`` will be replicated inside a cluster by the :ref:`distributed_data_scala` module with -``WriteMajority``/``ReadMajority`` consistency. +This mode is enabled with configuration (enabled by default):: -This mode can be enabled by setting configuration property:: + akka.cluster.sharding.state-store-mode = ddata - akka.cluster.sharding.state-store-mode = ddata +The state of the ``ShardCoordinator`` will be replicated inside a cluster by the +:ref:`distributed_data_scala` module with ``WriteMajority``/``ReadMajority`` consistency. +The state of the coordinator is not durable, it's not stored to disk. When all nodes in +the cluster have been stopped the state is lost and not needed any more. -It is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of +The state of :ref:`cluster_sharding_remembering_scala` is also durable, i.e. it is stored to +disk. The stored entities are started also after a complete cluster restart. + +Cluster Sharding is using its own Distributed Data ``Replicator`` per node role. In this way you can use a subset of all nodes for some entity types and another subset for other entity types. Each such replicator has a name that contains the node role and therefore the role configuration must be the same on all nodes in the cluster, i.e. you can't change the roles when performing a rolling upgrade. @@ -211,14 +228,18 @@ The settings for Distributed Data is configured in the the section ``akka.cluster.sharding.distributed-data``. It's not possible to have different ``distributed-data`` settings for different sharding entity types. -You must explicitly add the ``akka-distributed-data`` dependency to your build if -you use this mode. It is possible to remove ``akka-persistence`` dependency from a project if it -is not used in user code. +Persistence Mode +^^^^^^^^^^^^^^^^ -.. warning:: +This mode is enabled with configuration:: + + akka.cluster.sharding.state-store-mode = persistence + +Since it is running in a cluster :ref:`persistence-scala` must be configured with a distributed journal. + +You must explicitly add the ``akka-persistence`` dependency to your build if +you use this mode. - The ``ddata`` mode is considered as **“experimental”** as of its introduction in Akka 2.4.0, since - it depends on the experimental Distributed Data module. Startup after minimum number of members --------------------------------------- @@ -253,6 +274,8 @@ then supposed to stop itself. Incoming messages will be buffered by the ``Shard` between reception of ``Passivate`` and termination of the entity. Such buffered messages are thereafter delivered to a new incarnation of the entity. +.. _cluster_sharding_remembering_scala: + Remembering Entities -------------------- @@ -265,7 +288,7 @@ a ``Passivate`` message must be sent to the parent of the entity actor, otherwis entity will be automatically restarted after the entity restart backoff specified in the configuration. -When :ref:`cluster_sharding_ddata_scala` is used the identifiers of the entities are +When :ref:`Distributed Data mode ` is used the identifiers of the entities are stored in :ref:`ddata_durable_scala` of Distributed Data. You may want to change the configuration of the akka.cluster.sharding.distributed-data.durable.lmdb.dir`, since the default directory contains the remote port of the actor system. If using a dynamically diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 93505af7e2..2299726aea 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -175,12 +175,12 @@ object AkkaBuild extends Build { lazy val clusterSharding = Project( id = "akka-cluster-sharding", base = file("akka-cluster-sharding"), - // TODO akka-distributed-data dependency should be provided in pom.xml artifact. + // TODO akka-persistence dependency should be provided in pom.xml artifact. // If I only use "provided" here it works, but then we can't run tests. // Scope "test" is alright in the pom.xml, but would have been nicer with // provided. dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm", - persistence % "compile;test->provided", distributedData % "provided;test", clusterTools) + distributedData % "compile;test->provided", persistence % "provided;test", clusterTools) ).configs(MultiJvm) lazy val distributedData = Project(