diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 03804af920..68016d0029 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -41,74 +41,82 @@ import akka.util.ByteString /** * This extension provides sharding functionality of actors in a cluster. * The typical use case is when you have many stateful actors that together consume - * more resources (e.g. memory) than fit on one machine. You need to distribute them across - * several nodes in the cluster and you want to be able to interact with them using their - * logical identifier, but without having to care about their physical location in the cluster, - * which might also change over time. It could for example be actors representing Aggregate Roots in - * Domain-Driven Design terminology. Here we call these actors "entities". These actors - * typically have persistent (durable) state, but this feature is not limited to - * actors with persistent state. + * more resources (e.g. memory) than fit on one machine. + * - Distribution: You need to distribute them across several nodes in the cluster + * - Location Transparency: You need to interact with them using their logical identifier, + * without having to care about their physical location in the cluster, which can change over time. * - * In this context sharding means that actors with an identifier, so called entities, - * can be automatically distributed across multiple nodes in the cluster. Each entity - * actor runs only at one place, and messages can be sent to the entity without requiring - * the sender to know the location of the destination actor. This is achieved by sending - * the messages via a [[ShardRegion]] actor provided by this extension, which knows how - * to route the message with the entity id to the final destination. + * '''Entities''': + * It could for example be actors representing Aggregate Roots in Domain-Driven Design + * terminology. Here we call these actors "entities" which typically have persistent + * (durable) state, but this feature is not limited to persistent state actors. * - * This extension is supposed to be used by first, typically at system startup on each node - * in the cluster, registering the supported entity types with the [[ClusterSharding#start]] - * method and then the `ShardRegion` actor for a named entity type can be retrieved with - * [[ClusterSharding#shardRegion]]. Messages to the entities are always sent via the local - * `ShardRegion`. Some settings can be configured as described in the `akka.cluster.sharding` - * section of the `reference.conf`. + * '''Sharding''': + * In this context sharding means that actors with an identifier, or entities, + * can be automatically distributed across multiple nodes in the cluster. * + * '''ShardRegion''': + * Each entity actor runs only at one place, and messages can be sent to the entity without + * requiring the sender to know the location of the destination actor. This is achieved by + * sending the messages via a [[ShardRegion]] actor, provided by this extension. The [[ShardRegion]] + * knows the shard mappings and routes inbound messages to the entity with the entity id. + * Messages to the entities are always sent via the local `ShardRegion`. * The `ShardRegion` actor is started on each node in the cluster, or group of nodes * tagged with a specific role. The `ShardRegion` is created with two application specific * functions to extract the entity identifier and the shard identifier from incoming messages. + * + * Typical usage of this extension: + * 1. At system startup on each cluster node by registering the supported entity types with + * the [[ClusterSharding#start]] method + * 1. Retrieve the `ShardRegion` actor for a named entity type with [[ClusterSharding#shardRegion]] + * Settings can be configured as described in the `akka.cluster.sharding` section of the `reference.conf`. + * + * '''Shard and ShardCoordinator''': * A shard is a group of entities that will be managed together. For the first message in a - * specific shard the `ShardRegion` requests the location of the shard from a central coordinator, - * the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` + * specific shard the `ShardRegion` requests the location of the shard from a central + * [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` * owns the shard. The `ShardRegion` receives the decided home of the shard * and if that is the `ShardRegion` instance itself it will create a local child * actor representing the entity and direct all messages for that entity to it. - * If the shard home is another `ShardRegion` instance messages will be forwarded + * If the shard home is another `ShardRegion`, instance messages will be forwarded * to that `ShardRegion` instance instead. While resolving the location of a - * shard incoming messages for that shard are buffered and later delivered when the + * shard, incoming messages for that shard are buffered and later delivered when the * shard location is known. Subsequent messages to the resolved shard can be delivered * to the target destination immediately without involving the `ShardCoordinator`. - * - * To make sure that at most one instance of a specific entity actor is running somewhere + * To make sure at-most-one instance of a specific entity actor is running somewhere * in the cluster it is important that all nodes have the same view of where the shards * are located. Therefore the shard allocation decisions are taken by the central - * `ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on + * `ShardCoordinator`, a cluster singleton, i.e. one instance on * the oldest member among all cluster nodes or a group of nodes tagged with a specific * role. The oldest member can be determined by [[akka.cluster.Member#isOlderThan]]. * + * '''Shard Rebalancing''': * To be able to use newly added members in the cluster the coordinator facilitates rebalancing - * of shards, i.e. migrate entities from one node to another. In the rebalance process the - * coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. - * That means they will start buffering incoming messages for that shard, in the same way as if the + * of shards, migrating entities from one node to another. In the rebalance process the + * coordinator first notifies all `ShardRegion` actors that a handoff for a shard has begun. + * `ShardRegion` actors will start buffering incoming messages for that shard, as they do when * shard location is unknown. During the rebalance process the coordinator will not answer any * requests for the location of shards that are being rebalanced, i.e. local buffering will - * continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard - * will stop all entities in that shard by sending `PoisonPill` to them. When all entities have - * been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed - * to the coordinator. Thereafter the coordinator will reply to requests for the location of - * the shard and thereby allocate a new home for the shard and then buffered messages in the + * continue until the handoff is complete. The `ShardRegion` responsible for the rebalanced shard + * will stop all entities in that shard by sending them a `PoisonPill`. When all entities have + * been terminated the `ShardRegion` owning the entities will acknowledge to the coordinator that + * the handoff has completed. Thereafter the coordinator will reply to requests for the location of + * the shard, allocate a new home for the shard and then buffered messages in the * `ShardRegion` actors are delivered to the new location. This means that the state of the entities * are not transferred or migrated. If the state of the entities are of importance it should be - * persistent (durable), e.g. with `akka-persistence`, so that it can be recovered at the new + * persistent (durable), e.g. with `akka-persistence` so that it can be recovered at the new * location. * - * The logic that decides which shards to rebalance is defined in a plugable shard - * allocation strategy. The default implementation [[ShardCoordinator.LeastShardAllocationStrategy]] - * picks shards for handoff from the `ShardRegion` with most number of previously allocated shards. - * They will then be allocated to the `ShardRegion` with least number of previously allocated shards, - * i.e. new members in the cluster. There is a configurable threshold of how large the difference - * must be to begin the rebalancing. This strategy can be replaced by an application specific - * implementation. + * '''Shard Allocation''': + * The logic deciding which shards to rebalance is defined in a plugable shard allocation + * strategy. The default implementation [[ShardCoordinator.LeastShardAllocationStrategy]] + * picks shards for handoff from the `ShardRegion` with highest number of previously allocated shards. + * They will then be allocated to the `ShardRegion` with lowest number of previously allocated shards, + * i.e. new members in the cluster. There is a configurable `rebalance-threshold` of how large + * the difference must be to begin the rebalancing. This strategy can be replaced by an application + * specific implementation. * + * '''Recovery''': * The state of shard locations in the `ShardCoordinator` is stored with `akka-distributed-data` or * `akka-persistence` to survive failures. When a crashed or unreachable coordinator * node has been removed (via down) from the cluster a new `ShardCoordinator` singleton @@ -116,6 +124,7 @@ import akka.util.ByteString * with known location are still available, while messages for new (unknown) shards * are buffered until the new `ShardCoordinator` becomes available. * + * '''Delivery Semantics''': * As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity * actor the order of the messages is preserved. As long as the buffer limit is not reached * messages are delivered on a best effort basis, with at-most once delivery semantics, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index f797bce32c..6aaebb3cc7 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -423,7 +423,7 @@ object ShardRegion { /** * INTERNAL API * - * This actor creates children entity actors on demand for the shards that it is told to be + * This actor creates child entity actors on demand for the shards that it is told to be * responsible for. It delegates messages targeted to other shards to the responsible * `ShardRegion` actor on other nodes. *