diff --git a/akka-contrib/docs/cluster-sharding.rst b/akka-contrib/docs/cluster-sharding.rst index 9f1473769d..5e405c662c 100644 --- a/akka-contrib/docs/cluster-sharding.rst +++ b/akka-contrib/docs/cluster-sharding.rst @@ -18,7 +18,7 @@ it might be easier to run them on a :ref:`cluster-singleton` node. In this context sharding means that actors with an identifier, so called entries, can be automatically distributed across multiple nodes in the cluster. Each entry actor runs only at one place, and messages can be sent to the entry without requiring -the sender() to know the location of the destination actor. This is achieved by sending +the sender to know the location of the destination actor. This is achieved by sending the messages via a ``ShardRegion`` actor provided by this extension, which knows how to route the message with the entry id to the final destination. @@ -53,16 +53,27 @@ This example illustrates two different ways to define the entry identifier in th sent to the entry actor is wrapped in the envelope. Note how these two messages types are handled in the ``entryId`` and ``entryMessage`` methods shown above. +The message sent to the entry actor is what ``entryMessage`` returns and that makes it possible to unwrap envelopes +if needed. A shard is a group of entries that will be managed together. The grouping is defined by the -``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge -in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard. -As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number -of cluster nodes. +``shardResolver`` function shown above. For a specific entry identifier the shard identifier must always +be the same. Otherwise the entry actor might accidentily be started in several places at the same time. -Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a -named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will -lookup the location of the shard for the entry if it does not already know its location. It will +Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, +i.e. same amount of entries in each shard. As a rule of thumb, the number of shards should be a factor ten greater +than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes +will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing +overhead, and increased latency because the corrdinator is involved in the routing of the first message for each +shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping +all nodes in the cluster. + +A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the the entry identifier modulo +number of shards. + +Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a +named entry type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``. +The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will delegate the message to the right node and it will create the entry actor on demand, i.e. when the first message for a specific entry is delivered. @@ -99,16 +110,27 @@ This example illustrates two different ways to define the entry identifier in th sent to the entry actor is wrapped in the envelope. Note how these two messages types are handled in the ``idExtractor`` function shown above. +The message sent to the entry actor is the second part of the tuple return by the ``idExtractor`` and that makes it +possible to unwrap envelopes if needed. A shard is a group of entries that will be managed together. The grouping is defined by the -``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge -in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard. -As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number -of cluster nodes. +``shardResolver`` function shown above. For a specific entry identifier the shard identifier must always +be the same. -Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a -named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will -lookup the location of the shard for the entry if it does not already know its location. It will +Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, +i.e. same amount of entries in each shard. As a rule of thumb, the number of shards should be a factor ten greater +than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes +will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing +overhead, and increased latency because the corrdinator is involved in the routing of the first message for each +shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping +all nodes in the cluster. + +A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the the entry identifier modulo +number of shards. + +Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a +named entry type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``. +The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will delegate the message to the right node and it will create the entry actor on demand, i.e. when the first message for a specific entry is delivered. @@ -200,11 +222,11 @@ actor will take over and the state is recovered. During such a failure period sh with known location are still available, while messages for new (unknown) shards are buffered until the new ``ShardCoordinator`` becomes available. -As long as a sender() uses the same ``ShardRegion`` actor to deliver messages to an entry +As long as a sender uses the same ``ShardRegion`` actor to deliver messages to an entry actor the order of the messages is preserved. As long as the buffer limit is not reached messages are delivered on a best effort basis, with at-most once delivery semantics, in the same way as ordinary message sending. Reliable end-to-end messaging, with -at-least-once semantics can be added by using channels in ``akka-persistence``. +at-least-once semantics can be added by using ``AtLeastOnceDelivery`` in ``akka-persistence``. Some additional latency is introduced for messages targeted to new or previously unused shards due to the round-trip to the coordinator. Rebalancing of shards may diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index 04a9590487..e4f61b4d79 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -126,7 +126,7 @@ import akka.cluster.ClusterEvent.ClusterDomainEvent * actor the order of the messages is preserved. As long as the buffer limit is not reached * messages are delivered on a best effort basis, with at-most once delivery semantics, * in the same way as ordinary message sending. Reliable end-to-end messaging, with - * at-least-once semantics can be added by using channels in `akka-persistence`. + * at-least-once semantics can be added by using `AtLeastOnceDelivery` in `akka-persistence`. * * Some additional latency is introduced for messages targeted to new or previously * unused shards due to the round-trip to the coordinator. Rebalancing of shards may diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index 6d73e1037e..6667510810 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -108,15 +108,35 @@ object ClusterShardingSpec extends MultiNodeConfig { } //#counter-actor + val idExtractor: ShardRegion.IdExtractor = { + case EntryEnvelope(id, payload) ⇒ (id.toString, payload) + case msg @ Get(id) ⇒ (id.toString, msg) + } + + val numberOfShards = 12 + + val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + case EntryEnvelope(id, _) ⇒ (id % numberOfShards).toString + case Get(id) ⇒ (id % numberOfShards).toString + } + +} + +// only used in documentation +object ClusterShardingDocCode { + import ClusterShardingSpec._ + //#counter-extractor val idExtractor: ShardRegion.IdExtractor = { case EntryEnvelope(id, payload) ⇒ (id.toString, payload) case msg @ Get(id) ⇒ (id.toString, msg) } + val numberOfShards = 100 + val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { - case EntryEnvelope(id, _) ⇒ (id % 12).toString - case Get(id) ⇒ (id % 12).toString + case EntryEnvelope(id, _) ⇒ (id % numberOfShards).toString + case Get(id) ⇒ (id % numberOfShards).toString } //#counter-extractor @@ -496,16 +516,16 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(fifth) { //#counter-usage val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter") - counterRegion ! Get(100) + counterRegion ! Get(123) expectMsg(0) - counterRegion ! EntryEnvelope(100, Increment) - counterRegion ! Get(100) + counterRegion ! EntryEnvelope(123, Increment) + counterRegion ! Get(123) expectMsg(1) //#counter-usage - ClusterSharding(system).shardRegion("AnotherCounter") ! EntryEnvelope(100, Decrement) - ClusterSharding(system).shardRegion("AnotherCounter") ! Get(100) + ClusterSharding(system).shardRegion("AnotherCounter") ! EntryEnvelope(123, Decrement) + ClusterSharding(system).shardRegion("AnotherCounter") ! Get(123) expectMsg(-1) } diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java index 0a24529871..21acfdbc3c 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java @@ -48,12 +48,13 @@ public class ClusterShardingTest { @Override public String shardId(Object message) { + int numberOfShards = 100; if (message instanceof Counter.EntryEnvelope) { long id = ((Counter.EntryEnvelope) message).id; - return String.valueOf(id % 10); + return String.valueOf(id % numberOfShards); } else if (message instanceof Counter.Get) { long id = ((Counter.Get) message).counterId; - return String.valueOf(id % 10); + return String.valueOf(id % numberOfShards); } else { return null; } @@ -63,17 +64,17 @@ public class ClusterShardingTest { //#counter-extractor //#counter-start - ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", Props.create(Counter.class), false, - messageExtractor); + ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", + Props.create(Counter.class), false, messageExtractor); //#counter-start //#counter-usage ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter"); - counterRegion.tell(new Counter.Get(100), getSelf()); + counterRegion.tell(new Counter.Get(123), getSelf()); - counterRegion.tell(new Counter.EntryEnvelope(100, + counterRegion.tell(new Counter.EntryEnvelope(123, Counter.CounterOp.INCREMENT), getSelf()); - counterRegion.tell(new Counter.Get(100), getSelf()); + counterRegion.tell(new Counter.Get(123), getSelf()); //#counter-usage }