Merge pull request #16149 from akka/wip-clarify-sharding-docs-patriknw

=doc Clarify cluster sharding docs
This commit is contained in:
Patrik Nordwall 2014-10-31 10:07:30 +01:00
commit f2f88d9dd7
4 changed files with 75 additions and 32 deletions

View file

@ -18,7 +18,7 @@ it might be easier to run them on a :ref:`cluster-singleton` node.
In this context sharding means that actors with an identifier, so called entries,
can be automatically distributed across multiple nodes in the cluster. Each entry
actor runs only at one place, and messages can be sent to the entry without requiring
the sender() to know the location of the destination actor. This is achieved by sending
the sender to know the location of the destination actor. This is achieved by sending
the messages via a ``ShardRegion`` actor provided by this extension, which knows how
to route the message with the entry id to the final destination.
@ -53,16 +53,27 @@ This example illustrates two different ways to define the entry identifier in th
sent to the entry actor is wrapped in the envelope.
Note how these two messages types are handled in the ``entryId`` and ``entryMessage`` methods shown above.
The message sent to the entry actor is what ``entryMessage`` returns and that makes it possible to unwrap envelopes
if needed.
A shard is a group of entries that will be managed together. The grouping is defined by the
``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge
in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard.
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number
of cluster nodes.
``shardResolver`` function shown above. For a specific entry identifier the shard identifier must always
be the same. Otherwise the entry actor might accidentily be started in several places at the same time.
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a
named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will
lookup the location of the shard for the entry if it does not already know its location. It will
Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution,
i.e. same amount of entries in each shard. As a rule of thumb, the number of shards should be a factor ten greater
than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes
will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing
overhead, and increased latency because the corrdinator is involved in the routing of the first message for each
shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping
all nodes in the cluster.
A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the the entry identifier modulo
number of shards.
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
named entry type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
first message for a specific entry is delivered.
@ -99,16 +110,27 @@ This example illustrates two different ways to define the entry identifier in th
sent to the entry actor is wrapped in the envelope.
Note how these two messages types are handled in the ``idExtractor`` function shown above.
The message sent to the entry actor is the second part of the tuple return by the ``idExtractor`` and that makes it
possible to unwrap envelopes if needed.
A shard is a group of entries that will be managed together. The grouping is defined by the
``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge
in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard.
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number
of cluster nodes.
``shardResolver`` function shown above. For a specific entry identifier the shard identifier must always
be the same.
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a
named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will
lookup the location of the shard for the entry if it does not already know its location. It will
Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution,
i.e. same amount of entries in each shard. As a rule of thumb, the number of shards should be a factor ten greater
than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes
will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing
overhead, and increased latency because the corrdinator is involved in the routing of the first message for each
shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping
all nodes in the cluster.
A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the the entry identifier modulo
number of shards.
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
named entry type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
first message for a specific entry is delivered.
@ -200,11 +222,11 @@ actor will take over and the state is recovered. During such a failure period sh
with known location are still available, while messages for new (unknown) shards
are buffered until the new ``ShardCoordinator`` becomes available.
As long as a sender() uses the same ``ShardRegion`` actor to deliver messages to an entry
As long as a sender uses the same ``ShardRegion`` actor to deliver messages to an entry
actor the order of the messages is preserved. As long as the buffer limit is not reached
messages are delivered on a best effort basis, with at-most once delivery semantics,
in the same way as ordinary message sending. Reliable end-to-end messaging, with
at-least-once semantics can be added by using channels in ``akka-persistence``.
at-least-once semantics can be added by using ``AtLeastOnceDelivery`` in ``akka-persistence``.
Some additional latency is introduced for messages targeted to new or previously
unused shards due to the round-trip to the coordinator. Rebalancing of shards may

View file

@ -126,7 +126,7 @@ import akka.cluster.ClusterEvent.ClusterDomainEvent
* actor the order of the messages is preserved. As long as the buffer limit is not reached
* messages are delivered on a best effort basis, with at-most once delivery semantics,
* in the same way as ordinary message sending. Reliable end-to-end messaging, with
* at-least-once semantics can be added by using channels in `akka-persistence`.
* at-least-once semantics can be added by using `AtLeastOnceDelivery` in `akka-persistence`.
*
* Some additional latency is introduced for messages targeted to new or previously
* unused shards due to the round-trip to the coordinator. Rebalancing of shards may

View file

@ -108,15 +108,35 @@ object ClusterShardingSpec extends MultiNodeConfig {
}
//#counter-actor
val idExtractor: ShardRegion.IdExtractor = {
case EntryEnvelope(id, payload) (id.toString, payload)
case msg @ Get(id) (id.toString, msg)
}
val numberOfShards = 12
val shardResolver: ShardRegion.ShardResolver = msg msg match {
case EntryEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
}
}
// only used in documentation
object ClusterShardingDocCode {
import ClusterShardingSpec._
//#counter-extractor
val idExtractor: ShardRegion.IdExtractor = {
case EntryEnvelope(id, payload) (id.toString, payload)
case msg @ Get(id) (id.toString, msg)
}
val numberOfShards = 100
val shardResolver: ShardRegion.ShardResolver = msg msg match {
case EntryEnvelope(id, _) (id % 12).toString
case Get(id) (id % 12).toString
case EntryEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
}
//#counter-extractor
@ -496,16 +516,16 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(fifth) {
//#counter-usage
val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
counterRegion ! Get(100)
counterRegion ! Get(123)
expectMsg(0)
counterRegion ! EntryEnvelope(100, Increment)
counterRegion ! Get(100)
counterRegion ! EntryEnvelope(123, Increment)
counterRegion ! Get(123)
expectMsg(1)
//#counter-usage
ClusterSharding(system).shardRegion("AnotherCounter") ! EntryEnvelope(100, Decrement)
ClusterSharding(system).shardRegion("AnotherCounter") ! Get(100)
ClusterSharding(system).shardRegion("AnotherCounter") ! EntryEnvelope(123, Decrement)
ClusterSharding(system).shardRegion("AnotherCounter") ! Get(123)
expectMsg(-1)
}

View file

@ -48,12 +48,13 @@ public class ClusterShardingTest {
@Override
public String shardId(Object message) {
int numberOfShards = 100;
if (message instanceof Counter.EntryEnvelope) {
long id = ((Counter.EntryEnvelope) message).id;
return String.valueOf(id % 10);
return String.valueOf(id % numberOfShards);
} else if (message instanceof Counter.Get) {
long id = ((Counter.Get) message).counterId;
return String.valueOf(id % 10);
return String.valueOf(id % numberOfShards);
} else {
return null;
}
@ -63,17 +64,17 @@ public class ClusterShardingTest {
//#counter-extractor
//#counter-start
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", Props.create(Counter.class), false,
messageExtractor);
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
Props.create(Counter.class), false, messageExtractor);
//#counter-start
//#counter-usage
ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
counterRegion.tell(new Counter.Get(100), getSelf());
counterRegion.tell(new Counter.Get(123), getSelf());
counterRegion.tell(new Counter.EntryEnvelope(100,
counterRegion.tell(new Counter.EntryEnvelope(123,
Counter.CounterOp.INCREMENT), getSelf());
counterRegion.tell(new Counter.Get(100), getSelf());
counterRegion.tell(new Counter.Get(123), getSelf());
//#counter-usage
}