+con #3758 Provide sharding of actors in a cluster
* Delete files only from controller * Change default retry settings of singleton to speedup startup * Persistent snapshots
This commit is contained in:
parent
4972c7780c
commit
b3b66db643
14 changed files with 2259 additions and 27 deletions
235
akka-contrib/docs/cluster-sharding.rst
Normal file
235
akka-contrib/docs/cluster-sharding.rst
Normal file
|
|
@ -0,0 +1,235 @@
|
|||
.. _cluster-sharding:
|
||||
|
||||
Cluster Sharding
|
||||
================
|
||||
|
||||
The typical use case for this feature is when you have many stateful actors that together consume
|
||||
more resources (e.g. memory) than fit on one machine. You need to distribute them across
|
||||
several nodes in the cluster and you want to be able to interact with them using their
|
||||
logical identifier, but without having to care about their physical location in the cluster,
|
||||
which might also change over time. It could for example be actors representing Aggregate Roots in
|
||||
Domain-Driven Design terminology. Here we call these actors "entries". These actors
|
||||
typically have persistent (durable) state, but this feature is not limited to
|
||||
actors with persistent state.
|
||||
|
||||
In this context sharding means that actors with an identifier, so called entries,
|
||||
can be automatically distributed across multiple nodes in the cluster. Each entry
|
||||
actor runs only at one place, and messages can be sent to the entry without requiring
|
||||
the sender to know the location of the destination actor. This is achieved by sending
|
||||
the messages via a ``ShardRegion`` actor provided by this extension, which knows how
|
||||
to route the message with the entry id to the final destination.
|
||||
|
||||
An Example in Java
|
||||
------------------
|
||||
|
||||
This is how an entry actor may look like:
|
||||
|
||||
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-actor
|
||||
|
||||
The above actor uses event sourcing and the support provided in ``UntypedEventsourcedProcessor`` to store its state.
|
||||
It does not have to be a processor, but in case of failure or migration of entries between nodes it must be able to recover
|
||||
its state if it is valuable.
|
||||
|
||||
When using the sharding extension you are first, typically at system startup on each node
|
||||
in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start``
|
||||
method.
|
||||
|
||||
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-start
|
||||
|
||||
The ``messageExtractor`` defines application specific methods to extract the entry
|
||||
identifier and the shard identifier from incoming messages.
|
||||
|
||||
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-extractor
|
||||
|
||||
This example illustrates two different ways to define the entry identifier in the messages:
|
||||
|
||||
* The ``Get`` message includes the the identifier itself.
|
||||
* The ``EntryEnvelope`` holds the identifier, and the actual message that is
|
||||
sent to the entry actor is wrapped in the envelope.
|
||||
|
||||
Note how these two messages types are handled in the ``entryId`` and ``entryMessage`` methods shown above.
|
||||
|
||||
A shard is a group of entries that will be managed together. The grouping is defined by the
|
||||
``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge
|
||||
in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard.
|
||||
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number
|
||||
of cluster nodes.
|
||||
|
||||
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a
|
||||
named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will
|
||||
lookup the location of the shard for the entry if it does not already know its location. It will
|
||||
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
|
||||
first message for a specific entry is delivered.
|
||||
|
||||
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-usage
|
||||
|
||||
An Example in Scala
|
||||
-------------------
|
||||
|
||||
This is how an entry actor may look like:
|
||||
|
||||
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-actor
|
||||
|
||||
The above actor uses event sourcing and the support provided in ``EventsourcedProcessor`` to store its state.
|
||||
It does not have to be a processor, but in case of failure or migration of entries between nodes it must be able to recover
|
||||
its state if it is valuable.
|
||||
|
||||
When using the sharding extension you are first, typically at system startup on each node
|
||||
in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start``
|
||||
method.
|
||||
|
||||
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-start
|
||||
|
||||
The ``idExtractor`` and ``shardResolver`` are two application specific functions to extract the entry
|
||||
identifier and the shard identifier from incoming messages.
|
||||
|
||||
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-extractor
|
||||
|
||||
This example illustrates two different ways to define the entry identifier in the messages:
|
||||
|
||||
* The ``Get`` message includes the the identifier itself.
|
||||
* The ``EntryEnvelope`` holds the identifier, and the actual message that is
|
||||
sent to the entry actor is wrapped in the envelope.
|
||||
|
||||
Note how these two messages types are handled in the ``idExtractor`` function shown above.
|
||||
|
||||
A shard is a group of entries that will be managed together. The grouping is defined by the
|
||||
``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge
|
||||
in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard.
|
||||
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number
|
||||
of cluster nodes.
|
||||
|
||||
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a
|
||||
named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will
|
||||
lookup the location of the shard for the entry if it does not already know its location. It will
|
||||
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
|
||||
first message for a specific entry is delivered.
|
||||
|
||||
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-usage
|
||||
|
||||
How it works
|
||||
------------
|
||||
|
||||
The ``ShardRegion`` actor is started on each node in the cluster, or group of nodes
|
||||
tagged with a specific role. The ``ShardRegion`` is created with two application specific
|
||||
functions to extract the entry identifier and the shard identifier from incoming messages.
|
||||
A shard is a group of entries that will be managed together. For the first message in a
|
||||
specific shard the ``ShardRegion`` request the location of the shard from a central coordinator,
|
||||
the ``ShardCoordinator``.
|
||||
|
||||
The ``ShardCoordinator`` decides which ``ShardRegion`` that
|
||||
owns the shard. The ``ShardRegion`` receives the decided home of the shard
|
||||
and if that is the ``ShardRegion`` instance itself it will create a local child
|
||||
actor representing the entry and direct all messages for that entry to it.
|
||||
If the shard home is another ``ShardRegion`` instance messages will be forwarded
|
||||
to that ``ShardRegion`` instance instead. While resolving the location of a
|
||||
shard incoming messages for that shard are buffered and later delivered when the
|
||||
shard home is known. Subsequent messages to the resolved shard can be delivered
|
||||
to the target destination immediately without involving the ``ShardCoordinator``.
|
||||
|
||||
Scenario 1:
|
||||
|
||||
#. Incoming message M1 to ``ShardRegion`` instance R1.
|
||||
#. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1.
|
||||
#. C answers that the home of S1 is R1.
|
||||
#. R1 creates child actor for the entry E1 and sends buffered messages for S1 to E1 child
|
||||
#. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entry children as needed, and forwards messages to them.
|
||||
|
||||
Scenario 2:
|
||||
|
||||
#. Incoming message M2 to R1.
|
||||
#. M2 is mapped to S2. R1 doesn't know about S2, so it asks C for the location of S2.
|
||||
#. C answers that the home of S2 is R2.
|
||||
#. R1 sends buffered messages for S2 to R2
|
||||
#. All incoming messages for S2 which arrive at R1 can be handled by R1 without C. It forwards messages to R2.
|
||||
#. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2).
|
||||
|
||||
To make sure that at most one instance of a specific entry actor is running somewhere
|
||||
in the cluster it is important that all nodes have the same view of where the shards
|
||||
are located. Therefore the shard allocation decisions are taken by the central
|
||||
``ShardCoordinator``, which is running as a cluster singleton, i.e. one instance on
|
||||
the oldest member among all cluster nodes or a group of nodes tagged with a specific
|
||||
role.
|
||||
|
||||
The logic that decides where a shard is to be located is defined in a pluggable shard
|
||||
allocation strategy. The default implementation ``ShardCoordinator.LeastShardAllocationStrategy``
|
||||
allocates new shards to the ``ShardRegion`` with least number of previously allocated shards.
|
||||
This strategy can be replaced by an application specific implementation.
|
||||
|
||||
To be able to use newly added members in the cluster the coordinator facilitates rebalancing
|
||||
of shards, i.e. migrate entries from one node to another. In the rebalance process the
|
||||
coordinator first notifies all ``ShardRegion`` actors that a handoff for a shard has started.
|
||||
That means they will start buffering incoming messages for that shard, in the same way as if the
|
||||
shard location is unknown. During the rebalance process the coordinator will not answer any
|
||||
requests for the location of shards that are being rebalanced, i.e. local buffering will
|
||||
continue until the handoff is completed. The ``ShardRegion`` responsible for the rebalanced shard
|
||||
will stop all entries in that shard by sending ``PoisonPill`` to them. When all entries have
|
||||
been terminated the ``ShardRegion`` owning the entries will acknowledge the handoff as completed
|
||||
to the coordinator. Thereafter the coordinator will reply to requests for the location of
|
||||
the shard and thereby allocate a new home for the shard and then buffered messages in the
|
||||
``ShardRegion`` actors are delivered to the new location. This means that the state of the entries
|
||||
are not transferred or migrated. If the state of the entries are of importance it should be
|
||||
persistent (durable), e.g. with ``akka-persistence``, so that it can be recovered at the new
|
||||
location.
|
||||
|
||||
The logic that decides which shards to rebalance is defined in a pluggable shard
|
||||
allocation strategy. The default implementation ``ShardCoordinator.LeastShardAllocationStrategy``
|
||||
picks shards for handoff from the ``ShardRegion`` with most number of previously allocated shards.
|
||||
They will then be allocated to the ``ShardRegion`` with least number of previously allocated shards,
|
||||
i.e. new members in the cluster. There is a configurable threshold of how large the difference
|
||||
must be to begin the rebalancing. This strategy can be replaced by an application specific
|
||||
implementation.
|
||||
|
||||
The state of shard locations in the ``ShardCoordinator`` is persistent (durable) with
|
||||
``akka-persistence`` to survive failures. Since it is running in a cluster ``akka-persistence``
|
||||
must be configured with a distributed journal. When a crashed or unreachable coordinator
|
||||
node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton
|
||||
actor will take over and the state is recovered. During such a failure period shards
|
||||
with known location are still available, while messages for new (unknown) shards
|
||||
are buffered until the new ``ShardCoordinator`` becomes available.
|
||||
|
||||
As long as a sender uses the same ``ShardRegion`` actor to deliver messages to an entry
|
||||
actor the order of the messages is preserved. As long as the buffer limit is not reached
|
||||
messages are delivered on a best effort basis, with at-most once delivery semantics,
|
||||
in the same way as ordinary message sending. Reliable end-to-end messaging, with
|
||||
at-least-once semantics can be added by using channels in ``akka-persistence``.
|
||||
|
||||
Some additional latency is introduced for messages targeted to new or previously
|
||||
unused shards due to the round-trip to the coordinator. Rebalancing of shards may
|
||||
also add latency. This should be considered when designing the application specific
|
||||
shard resolution, e.g. to avoid too fine grained shards.
|
||||
|
||||
Proxy Only Mode
|
||||
---------------
|
||||
|
||||
The ``ShardRegion`` actor can also be started in proxy only mode, i.e. it will not
|
||||
host any entries itself, but knows how to delegate messages to the right location.
|
||||
A ``ShardRegion`` starts in proxy only mode if the roles of the node does not include
|
||||
the node role specified in ``akka.contrib.cluster.sharding.role`` config property
|
||||
or if the specified `entryProps` is ``None`` / ``null``.
|
||||
|
||||
Passivation
|
||||
-----------
|
||||
|
||||
If the state of the entries are persistent you may stop entries that are not used to
|
||||
reduce memory consumption. This is done by the application specific implementation of
|
||||
the entry actors for example by defining receive timeout (``context.setReceiveTimeout``).
|
||||
If a message is already enqueued to the entry when it stops itself the enqueued message
|
||||
in the mailbox will be dropped. To support graceful passivation without loosing such
|
||||
messages the entry actor can send ``ShardRegion.Passivate`` to its parent ``ShardRegion``.
|
||||
The specified wrapped message in ``Passivate`` will be sent back to the entry, which is
|
||||
then supposed to stop itself. Incoming messages will be buffered by the ``ShardRegion``
|
||||
between reception of ``Passivate`` and termination of the the entry. Such buffered messages
|
||||
are thereafter delivered to a new incarnation of the entry.
|
||||
|
||||
Configuration
|
||||
-------------
|
||||
|
||||
The ``ClusterSharding`` extension can be configured with the following properties:
|
||||
|
||||
.. includecode:: @contribSrc@/src/main/resources/reference.conf#sharding-ext-config
|
||||
|
||||
Custom shard allocation strategy can be defined in an optional parameter to
|
||||
``ClusterSharding.start``. See the API documentation of ``ShardAllocationStrategy``
|
||||
(Scala) or ``AbstractShardAllocationStrategy`` (Java) for details of how to implement a custom
|
||||
shard allocation strategy.
|
||||
|
|
@ -35,6 +35,7 @@ The Current List of Modules
|
|||
jul
|
||||
peek-mailbox
|
||||
cluster-singleton
|
||||
cluster-sharding
|
||||
distributed-pub-sub
|
||||
cluster-client
|
||||
aggregator
|
||||
|
|
|
|||
|
|
@ -70,3 +70,38 @@ akka.contrib.cluster.client {
|
|||
}
|
||||
}
|
||||
# //#cluster-client-mailbox-config
|
||||
|
||||
|
||||
# //#sharding-ext-config
|
||||
# Settings for the ClusterShardingExtension
|
||||
akka.contrib.cluster.sharding {
|
||||
# The extension creates a top level actor with this name in top level user scope,
|
||||
# e.g. '/user/sharding'
|
||||
guardian-name = sharding
|
||||
# Start the coordinator singleton manager on members tagged with this role.
|
||||
# All members are used if undefined or empty.
|
||||
# ShardRegion actor is started in proxy only mode on nodes that are not tagged
|
||||
# with this role.
|
||||
role = ""
|
||||
# The ShardRegion retries registration and shard location requests to the
|
||||
# ShardCoordinator with this interval if it does not reply.
|
||||
retry-interval = 2 s
|
||||
# Maximum number of messages that are buffered by a ShardRegion actor.
|
||||
buffer-size = 100000
|
||||
# Timeout of the shard rebalancing process.
|
||||
handoff-timeout = 60 s
|
||||
# Rebalance check is performed periodically with this interval.
|
||||
rebalance-interval = 10 s
|
||||
# How often the coordinator saves persistent snapshots, which are
|
||||
# used to reduce recovery times
|
||||
snapshot-interval = 3600 s
|
||||
# Setting for the default shard allocation strategy
|
||||
least-shard-allocation-strategy {
|
||||
# Threshold of how large the difference between most and least number of
|
||||
# allocated shards must be to begin the rebalancing.
|
||||
rebalance-threshold = 10
|
||||
# The number of ongoing rebalancing processes is limited to this number.
|
||||
max-simultaneous-rebalance = 3
|
||||
}
|
||||
}
|
||||
# //#sharding-ext-config
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -32,8 +32,8 @@ object ClusterSingletonManager {
|
|||
singletonName: String,
|
||||
terminationMessage: Any,
|
||||
role: Option[String],
|
||||
maxHandOverRetries: Int = 20,
|
||||
maxTakeOverRetries: Int = 15,
|
||||
maxHandOverRetries: Int = 10,
|
||||
maxTakeOverRetries: Int = 5,
|
||||
retryInterval: FiniteDuration = 1.second): Props =
|
||||
Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role,
|
||||
maxHandOverRetries, maxTakeOverRetries, retryInterval).withDeploy(Deploy.local)
|
||||
|
|
@ -190,9 +190,8 @@ object ClusterSingletonManager {
|
|||
}
|
||||
|
||||
def handleInitial(state: CurrentClusterState): Unit = {
|
||||
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect {
|
||||
case m if m.status == MemberStatus.Up && matchingRole(m) ⇒ m
|
||||
}
|
||||
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒
|
||||
m.status == MemberStatus.Up && matchingRole(m))
|
||||
val initial = InitialOldestState(membersByAge.headOption.map(_.address), membersByAge.size)
|
||||
changes :+= initial
|
||||
}
|
||||
|
|
@ -277,11 +276,11 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
|
|||
*
|
||||
* The cluster failure detector will notice when oldest node
|
||||
* becomes unreachable due to things like JVM crash, hard shut down,
|
||||
* or network failure. Then a new oldest node will take over and a
|
||||
* new singleton actor is created. For these failure scenarios there
|
||||
* will not be a graceful hand-over, but more than one active singletons
|
||||
* is prevented by all reasonable means. Some corner cases are eventually
|
||||
* resolved by configurable timeouts.
|
||||
* or network failure. When the crashed node has been removed (via down) from the
|
||||
* cluster then a new oldest node will take over and a new singleton actor is
|
||||
* created. For these failure scenarios there will not be a graceful hand-over,
|
||||
* but more than one active singletons is prevented by all reasonable means. Some
|
||||
* corner cases are eventually resolved by configurable timeouts.
|
||||
*
|
||||
* You access the singleton actor with `actorSelection` using the names you have
|
||||
* specified when creating the ClusterSingletonManager. You can subscribe to
|
||||
|
|
|
|||
|
|
@ -0,0 +1,483 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.contrib.pattern
|
||||
|
||||
import language.postfixOps
|
||||
import scala.concurrent.duration._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.actor.Identify
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Props
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.persistence.EventsourcedProcessor
|
||||
import akka.persistence.Persistence
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbJournal
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbStore
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.testkit.STMultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.testkit.TestEvent.Mute
|
||||
import java.io.File
|
||||
import org.apache.commons.io.FileUtils
|
||||
import akka.actor.ReceiveTimeout
|
||||
import akka.actor.ActorRef
|
||||
|
||||
object ClusterShardingSpec extends MultiNodeConfig {
|
||||
val controller = role("controller")
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
val fifth = role("fifth")
|
||||
val sixth = role("sixth")
|
||||
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.auto-down-unreachable-after = 0s
|
||||
akka.cluster.roles = ["backend"]
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||
akka.persistence.journal.leveldb-shared.store {
|
||||
native = off
|
||||
dir = "target/shared-journal"
|
||||
}
|
||||
akka.persistence.snapshot-store.local.dir = "target/snapshots"
|
||||
akka.contrib.cluster.sharding {
|
||||
role = backend
|
||||
retry-interval = 1 s
|
||||
handoff-timeout = 10 s
|
||||
rebalance-interval = 2 s
|
||||
least-shard-allocation-strategy {
|
||||
rebalance-threshold = 2
|
||||
max-simultaneous-rebalance = 1
|
||||
}
|
||||
}
|
||||
"""))
|
||||
|
||||
nodeConfig(sixth) {
|
||||
ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""")
|
||||
}
|
||||
|
||||
//#counter-actor
|
||||
case object Increment
|
||||
case object Decrement
|
||||
case class Get(counterId: Long)
|
||||
case class EntryEnvelope(id: Long, payload: Any)
|
||||
|
||||
case object Stop
|
||||
case class CounterChanged(delta: Int)
|
||||
|
||||
class Counter extends EventsourcedProcessor {
|
||||
import ShardRegion.Passivate
|
||||
|
||||
context.setReceiveTimeout(120.seconds)
|
||||
|
||||
var count = 0
|
||||
//#counter-actor
|
||||
|
||||
override def postStop(): Unit = {
|
||||
super.postStop()
|
||||
// Simulate that the passivation takes some time, to verify passivation bufffering
|
||||
Thread.sleep(500)
|
||||
}
|
||||
//#counter-actor
|
||||
|
||||
def updateState(event: CounterChanged): Unit =
|
||||
count += event.delta
|
||||
|
||||
override def receiveReplay: Receive = {
|
||||
case evt: CounterChanged ⇒ updateState(evt)
|
||||
}
|
||||
|
||||
override def receiveCommand: Receive = {
|
||||
case Increment ⇒ persist(CounterChanged(+1))(updateState)
|
||||
case Decrement ⇒ persist(CounterChanged(-1))(updateState)
|
||||
case Get(_) ⇒ sender ! count
|
||||
case ReceiveTimeout ⇒ context.parent ! Passivate(stopMessage = Stop)
|
||||
case Stop ⇒ context.stop(self)
|
||||
}
|
||||
}
|
||||
//#counter-actor
|
||||
|
||||
//#counter-extractor
|
||||
val idExtractor: ShardRegion.IdExtractor = {
|
||||
case EntryEnvelope(id, payload) ⇒ (id.toString, payload)
|
||||
case msg @ Get(id) ⇒ (id.toString, msg)
|
||||
}
|
||||
|
||||
val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match {
|
||||
case EntryEnvelope(id, _) ⇒ (id % 10).toString
|
||||
case Get(id) ⇒ (id % 10).toString
|
||||
}
|
||||
//#counter-extractor
|
||||
|
||||
}
|
||||
|
||||
class ClusterShardingMultiJvmNode1 extends ClusterShardingSpec
|
||||
class ClusterShardingMultiJvmNode2 extends ClusterShardingSpec
|
||||
class ClusterShardingMultiJvmNode3 extends ClusterShardingSpec
|
||||
class ClusterShardingMultiJvmNode4 extends ClusterShardingSpec
|
||||
class ClusterShardingMultiJvmNode5 extends ClusterShardingSpec
|
||||
class ClusterShardingMultiJvmNode6 extends ClusterShardingSpec
|
||||
class ClusterShardingMultiJvmNode7 extends ClusterShardingSpec
|
||||
|
||||
class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMultiNodeSpec with ImplicitSender {
|
||||
import ClusterShardingSpec._
|
||||
|
||||
override def initialParticipants = roles.size
|
||||
|
||||
val storageLocations = List(
|
||||
"akka.persistence.journal.leveldb.dir",
|
||||
"akka.persistence.journal.leveldb-shared.store.dir",
|
||||
"akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s)))
|
||||
|
||||
override protected def atStartup() {
|
||||
runOn(controller) {
|
||||
storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir))
|
||||
}
|
||||
}
|
||||
|
||||
override protected def afterTermination() {
|
||||
runOn(controller) {
|
||||
storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir))
|
||||
}
|
||||
}
|
||||
|
||||
def join(from: RoleName, to: RoleName): Unit = {
|
||||
runOn(from) {
|
||||
Cluster(system) join node(to).address
|
||||
createCoordinator()
|
||||
}
|
||||
enterBarrier(from.name + "-joined")
|
||||
}
|
||||
|
||||
def createCoordinator(): Unit = {
|
||||
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
|
||||
system.actorOf(ClusterSingletonManager.props(
|
||||
singletonProps = ShardCoordinator.props(handOffTimeout = 10.second, rebalanceInterval = 2.seconds,
|
||||
snapshotInterval = 3600.seconds, allocationStrategy),
|
||||
singletonName = "singleton",
|
||||
terminationMessage = PoisonPill,
|
||||
role = None),
|
||||
name = "counterCoordinator")
|
||||
}
|
||||
|
||||
lazy val region = system.actorOf(ShardRegion.props(
|
||||
entryProps = Props[Counter],
|
||||
role = None,
|
||||
coordinatorPath = "/user/counterCoordinator/singleton",
|
||||
retryInterval = 1.second,
|
||||
bufferSize = 1000,
|
||||
idExtractor = idExtractor,
|
||||
shardResolver = shardResolver),
|
||||
name = "counterRegion")
|
||||
|
||||
"Cluster sharding" must {
|
||||
|
||||
"setup shared journal" in {
|
||||
// start the Persistence extension
|
||||
Persistence(system)
|
||||
runOn(controller) {
|
||||
system.actorOf(Props[SharedLeveldbStore], "store")
|
||||
}
|
||||
enterBarrier("peristence-started")
|
||||
|
||||
runOn(first, second, third, fourth, fifth, sixth) {
|
||||
system.actorSelection(node(controller) / "user" / "store") ! Identify(None)
|
||||
val sharedStore = expectMsgType[ActorIdentity].ref.get
|
||||
SharedLeveldbJournal.setStore(sharedStore, system)
|
||||
}
|
||||
|
||||
enterBarrier("after-1")
|
||||
}
|
||||
|
||||
"work in single node cluster" in within(20 seconds) {
|
||||
join(first, first)
|
||||
|
||||
runOn(first) {
|
||||
region ! EntryEnvelope(1, Increment)
|
||||
region ! EntryEnvelope(1, Increment)
|
||||
region ! EntryEnvelope(1, Increment)
|
||||
region ! EntryEnvelope(1, Decrement)
|
||||
region ! Get(1)
|
||||
expectMsg(2)
|
||||
}
|
||||
|
||||
enterBarrier("after-2")
|
||||
}
|
||||
|
||||
"use second node" in within(20 seconds) {
|
||||
join(second, first)
|
||||
|
||||
runOn(second) {
|
||||
region ! EntryEnvelope(2, Increment)
|
||||
region ! EntryEnvelope(2, Increment)
|
||||
region ! EntryEnvelope(2, Increment)
|
||||
region ! EntryEnvelope(2, Decrement)
|
||||
region ! Get(2)
|
||||
expectMsg(2)
|
||||
}
|
||||
enterBarrier("second-update")
|
||||
runOn(first) {
|
||||
region ! EntryEnvelope(2, Increment)
|
||||
region ! Get(2)
|
||||
expectMsg(3)
|
||||
lastSender.path must be(node(second) / "user" / "counterRegion" / "2")
|
||||
}
|
||||
enterBarrier("first-update")
|
||||
|
||||
runOn(second) {
|
||||
region ! Get(2)
|
||||
expectMsg(3)
|
||||
lastSender.path must be(region.path / "2")
|
||||
}
|
||||
|
||||
enterBarrier("after-3")
|
||||
}
|
||||
|
||||
"support passivation and activation of entries" in {
|
||||
runOn(second) {
|
||||
region ! Get(2)
|
||||
expectMsg(3)
|
||||
region ! EntryEnvelope(2, ReceiveTimeout)
|
||||
// let the Passivate-Stop roundtrip begin to trigger buffering of subsequent messages
|
||||
Thread.sleep(200)
|
||||
region ! EntryEnvelope(2, Increment)
|
||||
region ! Get(2)
|
||||
expectMsg(4)
|
||||
}
|
||||
enterBarrier("after-4")
|
||||
}
|
||||
|
||||
"failover shards on crashed node" in within(30 seconds) {
|
||||
// mute logging of deadLetters during shutdown of systems
|
||||
if (!log.isDebugEnabled)
|
||||
system.eventStream.publish(Mute(DeadLettersFilter[Any]))
|
||||
enterBarrier("logs-muted")
|
||||
|
||||
runOn(controller) {
|
||||
testConductor.exit(second, 0).await
|
||||
}
|
||||
enterBarrier("crash-second")
|
||||
|
||||
runOn(first) {
|
||||
val probe = TestProbe()
|
||||
awaitAssert {
|
||||
within(1.second) {
|
||||
region.tell(Get(2), probe.ref)
|
||||
probe.expectMsg(4)
|
||||
probe.lastSender.path must be(region.path / "2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("after-5")
|
||||
}
|
||||
|
||||
"use third and fourth node" in within(15 seconds) {
|
||||
join(third, first)
|
||||
join(fourth, first)
|
||||
|
||||
runOn(third) {
|
||||
for (_ ← 1 to 10)
|
||||
region ! EntryEnvelope(3, Increment)
|
||||
region ! Get(3)
|
||||
expectMsg(10)
|
||||
}
|
||||
enterBarrier("third-update")
|
||||
|
||||
runOn(fourth) {
|
||||
for (_ ← 1 to 20)
|
||||
region ! EntryEnvelope(4, Increment)
|
||||
region ! Get(4)
|
||||
expectMsg(20)
|
||||
}
|
||||
enterBarrier("fourth-update")
|
||||
|
||||
runOn(first) {
|
||||
region ! EntryEnvelope(3, Increment)
|
||||
region ! Get(3)
|
||||
expectMsg(11)
|
||||
lastSender.path must be(node(third) / "user" / "counterRegion" / "3")
|
||||
|
||||
region ! EntryEnvelope(4, Increment)
|
||||
region ! Get(4)
|
||||
expectMsg(21)
|
||||
lastSender.path must be(node(fourth) / "user" / "counterRegion" / "4")
|
||||
}
|
||||
enterBarrier("first-update")
|
||||
|
||||
runOn(third) {
|
||||
region ! Get(3)
|
||||
expectMsg(11)
|
||||
lastSender.path must be(region.path / "3")
|
||||
}
|
||||
|
||||
runOn(fourth) {
|
||||
region ! Get(4)
|
||||
expectMsg(21)
|
||||
lastSender.path must be(region.path / "4")
|
||||
}
|
||||
|
||||
enterBarrier("after-6")
|
||||
}
|
||||
|
||||
"recover coordinator state after coordinator crash" in within(60 seconds) {
|
||||
join(fifth, fourth)
|
||||
|
||||
runOn(controller) {
|
||||
testConductor.exit(first, 0).await
|
||||
}
|
||||
enterBarrier("crash-first")
|
||||
|
||||
runOn(fifth) {
|
||||
val probe3 = TestProbe()
|
||||
awaitAssert {
|
||||
within(1.second) {
|
||||
region.tell(Get(3), probe3.ref)
|
||||
probe3.expectMsg(11)
|
||||
probe3.lastSender.path must be(node(third) / "user" / "counterRegion" / "3")
|
||||
}
|
||||
}
|
||||
val probe4 = TestProbe()
|
||||
awaitAssert {
|
||||
within(1.second) {
|
||||
region.tell(Get(4), probe4.ref)
|
||||
probe4.expectMsg(21)
|
||||
probe4.lastSender.path must be(node(fourth) / "user" / "counterRegion" / "4")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
enterBarrier("after-7")
|
||||
}
|
||||
|
||||
"rebalance to nodes with less shards" in within(30 seconds) {
|
||||
|
||||
runOn(fourth) {
|
||||
// third, fourth and fifth are still alive
|
||||
// shards 3 and 4 are already allocated
|
||||
// make sure shards 1 and 2 (previously on crashed first) are allocated
|
||||
awaitAssert {
|
||||
val probe1 = TestProbe()
|
||||
within(1.second) {
|
||||
region.tell(Get(1), probe1.ref)
|
||||
probe1.expectMsg(2)
|
||||
}
|
||||
}
|
||||
awaitAssert {
|
||||
val probe2 = TestProbe()
|
||||
within(1.second) {
|
||||
region.tell(Get(2), probe2.ref)
|
||||
probe2.expectMsg(4)
|
||||
}
|
||||
}
|
||||
|
||||
// add more shards, which should later trigger rebalance to new node sixth
|
||||
for (n ← 5 to 10)
|
||||
region ! EntryEnvelope(n, Increment)
|
||||
|
||||
for (n ← 5 to 10) {
|
||||
region ! Get(n)
|
||||
expectMsg(1)
|
||||
}
|
||||
}
|
||||
enterBarrier("more-added")
|
||||
|
||||
join(sixth, third)
|
||||
|
||||
runOn(sixth) {
|
||||
awaitAssert {
|
||||
val probe = TestProbe()
|
||||
within(3.seconds) {
|
||||
var count = 0
|
||||
for (n ← 1 to 10) {
|
||||
region.tell(Get(n), probe.ref)
|
||||
probe.expectMsgType[Int]
|
||||
if (probe.lastSender.path == region.path / n.toString)
|
||||
count += 1
|
||||
}
|
||||
count must be(2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("after-8")
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
"support proxy only mode" in within(10.seconds) {
|
||||
runOn(sixth) {
|
||||
val proxy = system.actorOf(ShardRegion.proxyProps(
|
||||
role = None,
|
||||
coordinatorPath = "/user/counterCoordinator/singleton",
|
||||
retryInterval = 1.second,
|
||||
bufferSize = 1000,
|
||||
idExtractor = idExtractor,
|
||||
shardResolver = shardResolver),
|
||||
name = "regionProxy")
|
||||
|
||||
proxy ! Get(1)
|
||||
expectMsg(2)
|
||||
proxy ! Get(2)
|
||||
expectMsg(4)
|
||||
}
|
||||
enterBarrier("after-9")
|
||||
}
|
||||
|
||||
"easy to use with extensions" in within(50.seconds) {
|
||||
runOn(third, fourth, fifth, sixth) {
|
||||
//#counter-start
|
||||
ClusterSharding(system).start(
|
||||
typeName = "Counter",
|
||||
entryProps = Some(Props[Counter]),
|
||||
idExtractor = idExtractor,
|
||||
shardResolver = shardResolver)
|
||||
//#counter-start
|
||||
ClusterSharding(system).start(
|
||||
typeName = "AnotherCounter",
|
||||
entryProps = Some(Props[Counter]),
|
||||
idExtractor = idExtractor,
|
||||
shardResolver = shardResolver)
|
||||
}
|
||||
enterBarrier("extension-started")
|
||||
runOn(fifth) {
|
||||
//#counter-usage
|
||||
val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
|
||||
counterRegion ! Get(100)
|
||||
expectMsg(0)
|
||||
|
||||
counterRegion ! EntryEnvelope(100, Increment)
|
||||
counterRegion ! Get(100)
|
||||
expectMsg(1)
|
||||
//#counter-usage
|
||||
|
||||
ClusterSharding(system).shardRegion("AnotherCounter") ! EntryEnvelope(100, Decrement)
|
||||
ClusterSharding(system).shardRegion("AnotherCounter") ! Get(100)
|
||||
expectMsg(-1)
|
||||
}
|
||||
|
||||
enterBarrier("extension-used")
|
||||
|
||||
// sixth is a frontend node, i.e. proxy only
|
||||
runOn(sixth) {
|
||||
for (n ← 1000 to 1010) {
|
||||
ClusterSharding(system).shardRegion("Counter") ! EntryEnvelope(n, Increment)
|
||||
ClusterSharding(system).shardRegion("Counter") ! Get(n)
|
||||
expectMsg(1)
|
||||
lastSender.path.address must not be (Cluster(system).selfAddress)
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("after-9")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -27,6 +27,7 @@ import akka.actor.Terminated
|
|||
import akka.actor.Identify
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.actor.ActorSelection
|
||||
import akka.cluster.MemberStatus
|
||||
|
||||
object ClusterSingletonManagerSpec extends MultiNodeConfig {
|
||||
val controller = role("controller")
|
||||
|
|
@ -156,9 +157,8 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
|
|||
|
||||
def receive = {
|
||||
case state: CurrentClusterState ⇒
|
||||
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect {
|
||||
case m if m.hasRole(role) ⇒ m
|
||||
}
|
||||
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒
|
||||
m.status == MemberStatus.Up && m.hasRole(role))
|
||||
case MemberUp(m) ⇒ if (m.hasRole(role)) membersByAge += m
|
||||
case MemberRemoved(m, _) ⇒ if (m.hasRole(role)) membersByAge -= m
|
||||
case other ⇒ consumer foreach { _.tell(other, sender) }
|
||||
|
|
|
|||
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.PoisonPill;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.ReceiveTimeout;
|
||||
import akka.japi.Procedure;
|
||||
import akka.persistence.UntypedEventsourcedProcessor;
|
||||
|
||||
// Doc code, compile only
|
||||
public class ClusterShardingTest {
|
||||
|
||||
ActorSystem system = null;
|
||||
|
||||
ActorRef getSelf() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void demonstrateUsage() {
|
||||
//#counter-extractor
|
||||
ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
|
||||
|
||||
@Override
|
||||
public String entryId(Object message) {
|
||||
if (message instanceof Counter.EntryEnvelope)
|
||||
return String.valueOf(((Counter.EntryEnvelope) message).id);
|
||||
else if (message instanceof Counter.Get)
|
||||
return String.valueOf(((Counter.Get) message).counterId);
|
||||
else
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object entryMessage(Object message) {
|
||||
if (message instanceof Counter.EntryEnvelope)
|
||||
return ((Counter.EntryEnvelope) message).payload;
|
||||
else
|
||||
return message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String shardId(Object message) {
|
||||
if (message instanceof Counter.EntryEnvelope) {
|
||||
long id = ((Counter.EntryEnvelope) message).id;
|
||||
return String.valueOf(id % 10);
|
||||
} else if (message instanceof Counter.Get) {
|
||||
long id = ((Counter.Get) message).counterId;
|
||||
return String.valueOf(id % 10);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
//#counter-extractor
|
||||
|
||||
//#counter-start
|
||||
ClusterSharding.get(system).start("Counter", Props.create(Counter.class),
|
||||
messageExtractor);
|
||||
//#counter-start
|
||||
|
||||
//#counter-usage
|
||||
ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
|
||||
counterRegion.tell(new Counter.Get(100), getSelf());
|
||||
|
||||
counterRegion.tell(new Counter.EntryEnvelope(100,
|
||||
Counter.CounterOp.INCREMENT), getSelf());
|
||||
counterRegion.tell(new Counter.Get(100), getSelf());
|
||||
//#counter-usage
|
||||
}
|
||||
|
||||
static//#counter-actor
|
||||
public class Counter extends UntypedEventsourcedProcessor {
|
||||
|
||||
public static enum CounterOp {
|
||||
INCREMENT, DECREMENT
|
||||
}
|
||||
|
||||
public static class Get {
|
||||
final public long counterId;
|
||||
|
||||
public Get(long counterId) {
|
||||
this.counterId = counterId;
|
||||
}
|
||||
}
|
||||
|
||||
public static class EntryEnvelope {
|
||||
final public long id;
|
||||
final public Object payload;
|
||||
|
||||
public EntryEnvelope(long id, Object payload) {
|
||||
this.id = id;
|
||||
this.payload = payload;
|
||||
}
|
||||
}
|
||||
|
||||
public static class CounterChanged {
|
||||
final public int delta;
|
||||
|
||||
public CounterChanged(int delta) {
|
||||
this.delta = delta;
|
||||
}
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
|
||||
@Override
|
||||
public void preStart() throws Exception {
|
||||
super.preStart();
|
||||
context().setReceiveTimeout(Duration.create(120, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
void updateState(CounterChanged event) {
|
||||
count += event.delta;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceiveReplay(Object msg) {
|
||||
if (msg instanceof CounterChanged)
|
||||
updateState((CounterChanged) msg);
|
||||
else
|
||||
unhandled(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceiveCommand(Object msg) {
|
||||
if (msg instanceof Get)
|
||||
getSender().tell(count, getSelf());
|
||||
|
||||
else if (msg == CounterOp.INCREMENT)
|
||||
persist(new CounterChanged(+1), new Procedure<CounterChanged>() {
|
||||
public void apply(CounterChanged evt) {
|
||||
updateState(evt);
|
||||
}
|
||||
});
|
||||
|
||||
else if (msg == CounterOp.DECREMENT)
|
||||
persist(new CounterChanged(-1), new Procedure<CounterChanged>() {
|
||||
public void apply(CounterChanged evt) {
|
||||
updateState(evt);
|
||||
}
|
||||
});
|
||||
|
||||
else if (msg.equals(ReceiveTimeout.getInstance()))
|
||||
getContext().parent().tell(
|
||||
new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
|
||||
|
||||
else
|
||||
unhandled(msg);
|
||||
}
|
||||
}
|
||||
|
||||
//#counter-actor
|
||||
|
||||
}
|
||||
|
|
@ -21,6 +21,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState;
|
|||
import akka.cluster.ClusterEvent.MemberEvent;
|
||||
import akka.cluster.ClusterEvent.MemberUp;
|
||||
import akka.cluster.ClusterEvent.MemberRemoved;
|
||||
import akka.cluster.MemberStatus;
|
||||
|
||||
public class ClusterSingletonManagerTest {
|
||||
|
||||
|
|
@ -30,9 +31,7 @@ public class ClusterSingletonManagerTest {
|
|||
final ActorRef testActor = null;
|
||||
|
||||
//#create-singleton-manager
|
||||
system.actorOf(
|
||||
ClusterSingletonManager.defaultProps(
|
||||
Props.create(Consumer.class, queue, testActor), "consumer",
|
||||
system.actorOf(ClusterSingletonManager.defaultProps(Props.create(Consumer.class, queue, testActor), "consumer",
|
||||
new End(), "worker"), "singleton");
|
||||
//#create-singleton-manager
|
||||
}
|
||||
|
|
@ -75,7 +74,7 @@ public class ClusterSingletonManagerTest {
|
|||
CurrentClusterState state = (CurrentClusterState) message;
|
||||
List<Member> members = new ArrayList<Member>();
|
||||
for (Member m : state.getMembers()) {
|
||||
if (m.hasRole(role))
|
||||
if (m.status().equals(MemberStatus.up()) && m.hasRole(role))
|
||||
members.add(m);
|
||||
}
|
||||
membersByAge.clear();
|
||||
|
|
@ -101,15 +100,16 @@ public class ClusterSingletonManagerTest {
|
|||
}
|
||||
|
||||
ActorSelection currentMaster() {
|
||||
return getContext().actorSelection(membersByAge.first().address() +
|
||||
"/user/singleton/statsService");
|
||||
return getContext().actorSelection(membersByAge.first().address() + "/user/singleton/statsService");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//#singleton-proxy
|
||||
|
||||
public static class End {
|
||||
}
|
||||
|
||||
public static class End {}
|
||||
|
||||
public static class Consumer {}
|
||||
public static class Consumer {
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.contrib.pattern
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Props
|
||||
|
||||
class LeastShardAllocationStrategySpec extends AkkaSpec {
|
||||
import ShardCoordinator._
|
||||
|
||||
val regionA = system.actorOf(Props.empty, "regionA")
|
||||
val regionB = system.actorOf(Props.empty, "regionB")
|
||||
val regionC = system.actorOf(Props.empty, "regionC")
|
||||
|
||||
val allocationStrategy = new LeastShardAllocationStrategy(rebalanceThreshold = 3, maxSimultaneousRebalance = 2)
|
||||
|
||||
"LeastShardAllocationStrategy" must {
|
||||
"allocate to region with least number of shards" in {
|
||||
val allocations = Map(regionA -> Vector("shard1"), regionB -> Vector("shard2"), regionC -> Vector.empty)
|
||||
allocationStrategy.allocateShard(regionA, "shard3", allocations) must be(regionC)
|
||||
}
|
||||
|
||||
"rebalance from region with most number of shards" in {
|
||||
val allocations = Map(regionA -> Vector("shard1"), regionB -> Vector("shard2", "shard3"),
|
||||
regionC -> Vector.empty)
|
||||
|
||||
// so far regionB has 2 shards and regionC has 0 shards, but the diff is less than rebalanceThreshold
|
||||
allocationStrategy.rebalance(allocations, Set.empty) must be(Set.empty)
|
||||
|
||||
val allocations2 = allocations.updated(regionB, Vector("shard2", "shard3", "shard4"))
|
||||
allocationStrategy.rebalance(allocations2, Set.empty) must be(Set("shard2"))
|
||||
allocationStrategy.rebalance(allocations2, Set("shard4")) must be(Set.empty)
|
||||
|
||||
val allocations3 = allocations2.updated(regionA, Vector("shard1", "shard5", "shard6"))
|
||||
allocationStrategy.rebalance(allocations3, Set("shard1")) must be(Set("shard2"))
|
||||
}
|
||||
|
||||
"must limit number of simultanious rebalance" in {
|
||||
val allocations = Map(regionA -> Vector("shard1"),
|
||||
regionB -> Vector("shard2", "shard3", "shard4", "shard5", "shard6"), regionC -> Vector.empty)
|
||||
|
||||
allocationStrategy.rebalance(allocations, Set("shard2")) must be(Set("shard3"))
|
||||
allocationStrategy.rebalance(allocations, Set("shard2", "shard3")) must be(Set.empty)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -275,6 +275,15 @@ cases to consider. Therefore, this specific use case is made easily accessible b
|
|||
:ref:`cluster-singleton` in the contrib module. You can use it as is, or adjust to fit
|
||||
your specific needs.
|
||||
|
||||
Cluster Sharding
|
||||
^^^^^^^^^^^^^^^^
|
||||
|
||||
When you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine
|
||||
you need to distribute them across several nodes in the cluster. You want to be able to interact with them using their
|
||||
logical identifier, but without having to care about their physical location in the cluster.
|
||||
|
||||
See :ref:`cluster-sharding` in the contrib module.
|
||||
|
||||
Distributed Publish Subscribe Pattern
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
|
|
|||
|
|
@ -269,6 +269,15 @@ cases to consider. Therefore, this specific use case is made easily accessible b
|
|||
:ref:`cluster-singleton` in the contrib module. You can use it as is, or adjust to fit
|
||||
your specific needs.
|
||||
|
||||
Cluster Sharding
|
||||
^^^^^^^^^^^^^^^^
|
||||
|
||||
When you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine
|
||||
you need to distribute them across several nodes in the cluster. You want to be able to interact with them using their
|
||||
logical identifier, but without having to care about their physical location in the cluster.
|
||||
|
||||
See :ref:`cluster-sharding` in the contrib module.
|
||||
|
||||
Distributed Publish Subscribe Pattern
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
|
|
|
|||
|
|
@ -650,7 +650,7 @@ object AkkaBuild extends Build {
|
|||
lazy val contrib = Project(
|
||||
id = "akka-contrib",
|
||||
base = file("akka-contrib"),
|
||||
dependencies = Seq(remote, remoteTests % "test->test", cluster),
|
||||
dependencies = Seq(remote, remoteTests % "test->test", cluster, persistence),
|
||||
settings = defaultSettings ++ formatSettings ++ scaladocSettings ++ javadocSettings ++ multiJvmSettings ++ Seq(
|
||||
libraryDependencies ++= Dependencies.contrib,
|
||||
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),
|
||||
|
|
@ -1247,7 +1247,7 @@ object Dependencies {
|
|||
|
||||
val clusterSample = Seq(Test.scalatest, sigar)
|
||||
|
||||
val contrib = Seq(Test.junitIntf)
|
||||
val contrib = Seq(Test.junitIntf, Test.commonsIo)
|
||||
|
||||
val multiNodeSample = Seq(Test.scalatest)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue