Merge remote-tracking branch 'pr/15597'
This commit is contained in:
commit
3d0cf9d602
7 changed files with 908 additions and 163 deletions
|
|
@ -8,12 +8,12 @@ be able to interact with them using their logical identifier, but without having
|
||||||
their physical location in the cluster, which might also change over time.
|
their physical location in the cluster, which might also change over time.
|
||||||
|
|
||||||
It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology.
|
It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology.
|
||||||
Here we call these actors "entries". These actors typically have persistent (durable) state,
|
Here we call these actors "entries". These actors typically have persistent (durable) state,
|
||||||
but this feature is not limited to actors with persistent state.
|
but this feature is not limited to actors with persistent state.
|
||||||
|
|
||||||
Cluster sharding is typically used when you have many stateful actors that together consume
|
Cluster sharding is typically used when you have many stateful actors that together consume
|
||||||
more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors
|
more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors
|
||||||
it might be easier to run them on a :ref:`cluster-singleton` node.
|
it might be easier to run them on a :ref:`cluster-singleton` node.
|
||||||
|
|
||||||
In this context sharding means that actors with an identifier, so called entries,
|
In this context sharding means that actors with an identifier, so called entries,
|
||||||
can be automatically distributed across multiple nodes in the cluster. Each entry
|
can be automatically distributed across multiple nodes in the cluster. Each entry
|
||||||
|
|
@ -46,21 +46,21 @@ identifier and the shard identifier from incoming messages.
|
||||||
|
|
||||||
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-extractor
|
.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-extractor
|
||||||
|
|
||||||
This example illustrates two different ways to define the entry identifier in the messages:
|
This example illustrates two different ways to define the entry identifier in the messages:
|
||||||
|
|
||||||
* The ``Get`` message includes the identifier itself.
|
* The ``Get`` message includes the identifier itself.
|
||||||
* The ``EntryEnvelope`` holds the identifier, and the actual message that is
|
* The ``EntryEnvelope`` holds the identifier, and the actual message that is
|
||||||
sent to the entry actor is wrapped in the envelope.
|
sent to the entry actor is wrapped in the envelope.
|
||||||
|
|
||||||
Note how these two messages types are handled in the ``entryId`` and ``entryMessage`` methods shown above.
|
Note how these two messages types are handled in the ``entryId`` and ``entryMessage`` methods shown above.
|
||||||
|
|
||||||
A shard is a group of entries that will be managed together. The grouping is defined by the
|
A shard is a group of entries that will be managed together. The grouping is defined by the
|
||||||
``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge
|
``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge
|
||||||
in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard.
|
in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard.
|
||||||
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number
|
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number
|
||||||
of cluster nodes.
|
of cluster nodes.
|
||||||
|
|
||||||
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a
|
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a
|
||||||
named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will
|
named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will
|
||||||
lookup the location of the shard for the entry if it does not already know its location. It will
|
lookup the location of the shard for the entry if it does not already know its location. It will
|
||||||
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
|
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
|
||||||
|
|
@ -92,21 +92,21 @@ identifier and the shard identifier from incoming messages.
|
||||||
|
|
||||||
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-extractor
|
.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-extractor
|
||||||
|
|
||||||
This example illustrates two different ways to define the entry identifier in the messages:
|
This example illustrates two different ways to define the entry identifier in the messages:
|
||||||
|
|
||||||
* The ``Get`` message includes the identifier itself.
|
* The ``Get`` message includes the identifier itself.
|
||||||
* The ``EntryEnvelope`` holds the identifier, and the actual message that is
|
* The ``EntryEnvelope`` holds the identifier, and the actual message that is
|
||||||
sent to the entry actor is wrapped in the envelope.
|
sent to the entry actor is wrapped in the envelope.
|
||||||
|
|
||||||
Note how these two messages types are handled in the ``idExtractor`` function shown above.
|
Note how these two messages types are handled in the ``idExtractor`` function shown above.
|
||||||
|
|
||||||
A shard is a group of entries that will be managed together. The grouping is defined by the
|
A shard is a group of entries that will be managed together. The grouping is defined by the
|
||||||
``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge
|
``shardResolver`` function shown above. Creating a good sharding algorithm is an interesting challenge
|
||||||
in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard.
|
in itself. Try to produce a uniform distribution, i.e. same amount of entries in each shard.
|
||||||
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number
|
As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number
|
||||||
of cluster nodes.
|
of cluster nodes.
|
||||||
|
|
||||||
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a
|
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor for a
|
||||||
named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will
|
named entry type can be retrieved with ``ClusterSharding.shardRegion``. The ``ShardRegion`` will
|
||||||
lookup the location of the shard for the entry if it does not already know its location. It will
|
lookup the location of the shard for the entry if it does not already know its location. It will
|
||||||
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
|
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
|
||||||
|
|
@ -123,14 +123,16 @@ How it works
|
||||||
The ``ShardRegion`` actor is started on each node in the cluster, or group of nodes
|
The ``ShardRegion`` actor is started on each node in the cluster, or group of nodes
|
||||||
tagged with a specific role. The ``ShardRegion`` is created with two application specific
|
tagged with a specific role. The ``ShardRegion`` is created with two application specific
|
||||||
functions to extract the entry identifier and the shard identifier from incoming messages.
|
functions to extract the entry identifier and the shard identifier from incoming messages.
|
||||||
A shard is a group of entries that will be managed together. For the first message in a
|
A shard is a group of entries that will be managed together. For the first message in a
|
||||||
specific shard the ``ShardRegion`` request the location of the shard from a central coordinator,
|
specific shard the ``ShardRegion`` request the location of the shard from a central coordinator,
|
||||||
the ``ShardCoordinator``.
|
the ``ShardCoordinator``.
|
||||||
|
|
||||||
|
The ``ShardCoordinator`` decides which ``ShardRegion`` shall own the ``Shard`` and informs
|
||||||
|
that ``ShardRegion``. The region will confirm this request and create the ``Shard`` supervisor
|
||||||
|
as a child actor. The individual ``Entries`` will then be created when needed by the ``Shard``
|
||||||
|
actor. Incoming messages thus travel via the ``ShardRegion`` and the ``Shard`` to the target
|
||||||
|
``Entry``.
|
||||||
|
|
||||||
The ``ShardCoordinator`` decides which ``ShardRegion`` that
|
|
||||||
owns the shard. The ``ShardRegion`` receives the decided home of the shard
|
|
||||||
and if that is the ``ShardRegion`` instance itself it will create a local child
|
|
||||||
actor representing the entry and direct all messages for that entry to it.
|
|
||||||
If the shard home is another ``ShardRegion`` instance messages will be forwarded
|
If the shard home is another ``ShardRegion`` instance messages will be forwarded
|
||||||
to that ``ShardRegion`` instance instead. While resolving the location of a
|
to that ``ShardRegion`` instance instead. While resolving the location of a
|
||||||
shard incoming messages for that shard are buffered and later delivered when the
|
shard incoming messages for that shard are buffered and later delivered when the
|
||||||
|
|
@ -139,20 +141,20 @@ to the target destination immediately without involving the ``ShardCoordinator``
|
||||||
|
|
||||||
Scenario 1:
|
Scenario 1:
|
||||||
|
|
||||||
#. Incoming message M1 to ``ShardRegion`` instance R1.
|
#. Incoming message M1 to ``ShardRegion`` instance R1.
|
||||||
#. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1.
|
#. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1.
|
||||||
#. C answers that the home of S1 is R1.
|
#. C answers that the home of S1 is R1.
|
||||||
#. R1 creates child actor for the entry E1 and sends buffered messages for S1 to E1 child
|
#. R1 creates child actor for the entry E1 and sends buffered messages for S1 to E1 child
|
||||||
#. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entry children as needed, and forwards messages to them.
|
#. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entry children as needed, and forwards messages to them.
|
||||||
|
|
||||||
Scenario 2:
|
Scenario 2:
|
||||||
|
|
||||||
#. Incoming message M2 to R1.
|
#. Incoming message M2 to R1.
|
||||||
#. M2 is mapped to S2. R1 doesn't know about S2, so it asks C for the location of S2.
|
#. M2 is mapped to S2. R1 doesn't know about S2, so it asks C for the location of S2.
|
||||||
#. C answers that the home of S2 is R2.
|
#. C answers that the home of S2 is R2.
|
||||||
#. R1 sends buffered messages for S2 to R2
|
#. R1 sends buffered messages for S2 to R2
|
||||||
#. All incoming messages for S2 which arrive at R1 can be handled by R1 without C. It forwards messages to R2.
|
#. All incoming messages for S2 which arrive at R1 can be handled by R1 without C. It forwards messages to R2.
|
||||||
#. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2).
|
#. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2).
|
||||||
|
|
||||||
To make sure that at most one instance of a specific entry actor is running somewhere
|
To make sure that at most one instance of a specific entry actor is running somewhere
|
||||||
in the cluster it is important that all nodes have the same view of where the shards
|
in the cluster it is important that all nodes have the same view of where the shards
|
||||||
|
|
@ -226,12 +228,30 @@ reduce memory consumption. This is done by the application specific implementati
|
||||||
the entry actors for example by defining receive timeout (``context.setReceiveTimeout``).
|
the entry actors for example by defining receive timeout (``context.setReceiveTimeout``).
|
||||||
If a message is already enqueued to the entry when it stops itself the enqueued message
|
If a message is already enqueued to the entry when it stops itself the enqueued message
|
||||||
in the mailbox will be dropped. To support graceful passivation without loosing such
|
in the mailbox will be dropped. To support graceful passivation without loosing such
|
||||||
messages the entry actor can send ``ShardRegion.Passivate`` to its parent ``ShardRegion``.
|
messages the entry actor can send ``ShardRegion.Passivate`` to its parent ``Shard``.
|
||||||
The specified wrapped message in ``Passivate`` will be sent back to the entry, which is
|
The specified wrapped message in ``Passivate`` will be sent back to the entry, which is
|
||||||
then supposed to stop itself. Incoming messages will be buffered by the ``ShardRegion``
|
then supposed to stop itself. Incoming messages will be buffered by the ``Shard``
|
||||||
between reception of ``Passivate`` and termination of the entry. Such buffered messages
|
between reception of ``Passivate`` and termination of the entry. Such buffered messages
|
||||||
are thereafter delivered to a new incarnation of the entry.
|
are thereafter delivered to a new incarnation of the entry.
|
||||||
|
|
||||||
|
Remembering Entries
|
||||||
|
-------------------
|
||||||
|
|
||||||
|
The list of entries in each ``Shard`` can be made persistent (durable) by setting
|
||||||
|
the ``rememberEntries`` flag to true when calling ``ClusterSharding.start``. When configured
|
||||||
|
to remember entries, whenever a ``Shard`` is rebalanced onto another node or recovers after a
|
||||||
|
crash it will recreate all the entries which were previously running in that ``Shard``. To
|
||||||
|
permanently stop entries, a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the
|
||||||
|
entry will be automatically restarted after the entry restart backoff specified in the configuration.
|
||||||
|
|
||||||
|
When ``rememberEntries`` is set to false, a ``Shard`` will not automatically restart any entries
|
||||||
|
after a rebalance or recovering from a crash. Entries will only be started once the first message
|
||||||
|
for that entry has been received in the ``Shard``. Entries will not be restarted if they stop without
|
||||||
|
using a ``Passivate``.
|
||||||
|
|
||||||
|
Note that the state of the entries themselves will not be restored unless they have been made persistent,
|
||||||
|
e.g. with ``akka-persistence``.
|
||||||
|
|
||||||
Configuration
|
Configuration
|
||||||
-------------
|
-------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -93,6 +93,16 @@ akka.contrib.cluster.sharding {
|
||||||
buffer-size = 100000
|
buffer-size = 100000
|
||||||
# Timeout of the shard rebalancing process.
|
# Timeout of the shard rebalancing process.
|
||||||
handoff-timeout = 60 s
|
handoff-timeout = 60 s
|
||||||
|
# Time given to a region to acknowdge it's hosting a shard.
|
||||||
|
shard-start-timeout = 10 s
|
||||||
|
# If the shard can't store state changes it will retry the action
|
||||||
|
# again after this duration. Any messages sent to an affected entry
|
||||||
|
# will be buffered until the state change is processed
|
||||||
|
shard-failure-backoff = 10 s
|
||||||
|
# If the shard is remembering entries and an entry stops itself without
|
||||||
|
# using passivate. The entry will be restarted after this duration or when
|
||||||
|
# the next message for it is received, which ever occurs first.
|
||||||
|
entry-restart-backoff = 10 s
|
||||||
# Rebalance check is performed periodically with this interval.
|
# Rebalance check is performed periodically with this interval.
|
||||||
rebalance-interval = 10 s
|
rebalance-interval = 10 s
|
||||||
# How often the coordinator saves persistent snapshots, which are
|
# How often the coordinator saves persistent snapshots, which are
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
|
|
@ -4,13 +4,12 @@
|
||||||
package akka.contrib.pattern
|
package akka.contrib.pattern
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
import akka.contrib.pattern.ShardRegion.Passivate
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.Actor
|
import akka.actor._
|
||||||
import akka.actor.ActorIdentity
|
|
||||||
import akka.actor.Identify
|
|
||||||
import akka.actor.Props
|
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.cluster.ClusterEvent._
|
import akka.cluster.ClusterEvent._
|
||||||
import akka.persistence.Persistence
|
import akka.persistence.Persistence
|
||||||
|
|
@ -44,6 +43,7 @@ object ClusterShardingFailureSpec extends MultiNodeConfig {
|
||||||
}
|
}
|
||||||
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec"
|
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec"
|
||||||
akka.contrib.cluster.sharding.coordinator-failure-backoff = 3s
|
akka.contrib.cluster.sharding.coordinator-failure-backoff = 3s
|
||||||
|
akka.contrib.cluster.sharding.shard-failure-backoff = 3s
|
||||||
"""))
|
"""))
|
||||||
|
|
||||||
testTransport(on = true)
|
testTransport(on = true)
|
||||||
|
|
@ -111,6 +111,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
|
||||||
ClusterSharding(system).start(
|
ClusterSharding(system).start(
|
||||||
typeName = "Entity",
|
typeName = "Entity",
|
||||||
entryProps = Some(Props[Entity]),
|
entryProps = Some(Props[Entity]),
|
||||||
|
rememberEntries = true,
|
||||||
idExtractor = idExtractor,
|
idExtractor = idExtractor,
|
||||||
shardResolver = shardResolver)
|
shardResolver = shardResolver)
|
||||||
}
|
}
|
||||||
|
|
@ -143,10 +144,13 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
region ! Add("10", 1)
|
region ! Add("10", 1)
|
||||||
region ! Add("20", 2)
|
region ! Add("20", 2)
|
||||||
|
region ! Add("21", 3)
|
||||||
region ! Get("10")
|
region ! Get("10")
|
||||||
expectMsg(Value("10", 1))
|
expectMsg(Value("10", 1))
|
||||||
region ! Get("20")
|
region ! Get("20")
|
||||||
expectMsg(Value("20", 2))
|
expectMsg(Value("20", 2))
|
||||||
|
region ! Get("21")
|
||||||
|
expectMsg(Value("21", 3))
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-2")
|
enterBarrier("after-2")
|
||||||
|
|
@ -160,9 +164,29 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
|
||||||
enterBarrier("journal-blackholed")
|
enterBarrier("journal-blackholed")
|
||||||
|
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
|
region ! Get("21")
|
||||||
|
expectMsg(Value("21", 3))
|
||||||
|
val entry21 = lastSender
|
||||||
|
val shard2 = system.actorSelection(entry21.path.parent)
|
||||||
|
|
||||||
|
//Test the ShardCoordinator allocating shards during a journal failure
|
||||||
region ! Add("30", 3)
|
region ! Add("30", 3)
|
||||||
|
|
||||||
|
//Test the Shard starting entries and persisting during a journal failure
|
||||||
|
region ! Add("11", 1)
|
||||||
|
|
||||||
|
//Test the Shard passivate works during a journal failure
|
||||||
|
shard2.tell(Passivate(PoisonPill), entry21)
|
||||||
|
region ! Add("21", 1)
|
||||||
|
|
||||||
|
region ! Get("21")
|
||||||
|
expectMsg(Value("21", 1))
|
||||||
|
|
||||||
region ! Get("30")
|
region ! Get("30")
|
||||||
expectMsg(Value("30", 3))
|
expectMsg(Value("30", 3))
|
||||||
|
|
||||||
|
region ! Get("11")
|
||||||
|
expectMsg(Value("11", 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(controller) {
|
runOn(controller) {
|
||||||
|
|
@ -175,8 +199,11 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
|
||||||
region ! Add("10", 1)
|
region ! Add("10", 1)
|
||||||
region ! Add("20", 2)
|
region ! Add("20", 2)
|
||||||
region ! Add("30", 3)
|
region ! Add("30", 3)
|
||||||
|
region ! Add("11", 4)
|
||||||
region ! Get("10")
|
region ! Get("10")
|
||||||
expectMsg(Value("10", 2))
|
expectMsg(Value("10", 2))
|
||||||
|
region ! Get("11")
|
||||||
|
expectMsg(Value("11", 5))
|
||||||
region ! Get("20")
|
region ! Get("20")
|
||||||
expectMsg(Value("20", 4))
|
expectMsg(Value("20", 4))
|
||||||
region ! Get("30")
|
region ! Get("30")
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,9 @@
|
||||||
*/
|
*/
|
||||||
package akka.contrib.pattern
|
package akka.contrib.pattern
|
||||||
|
|
||||||
|
import akka.contrib.pattern.ShardCoordinator.Internal.{ ShardStopped, HandOff }
|
||||||
|
import akka.contrib.pattern.ShardRegion.Passivate
|
||||||
|
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
@ -51,12 +54,14 @@ object ClusterShardingSpec extends MultiNodeConfig {
|
||||||
role = backend
|
role = backend
|
||||||
retry-interval = 1 s
|
retry-interval = 1 s
|
||||||
handoff-timeout = 10 s
|
handoff-timeout = 10 s
|
||||||
|
shard-start-timeout = 5s
|
||||||
|
entry-restart-backoff = 1s
|
||||||
rebalance-interval = 2 s
|
rebalance-interval = 2 s
|
||||||
least-shard-allocation-strategy {
|
least-shard-allocation-strategy {
|
||||||
rebalance-threshold = 2
|
rebalance-threshold = 2
|
||||||
max-simultaneous-rebalance = 1
|
max-simultaneous-rebalance = 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"""))
|
"""))
|
||||||
|
|
||||||
nodeConfig(sixth) {
|
nodeConfig(sixth) {
|
||||||
|
|
@ -77,9 +82,9 @@ object ClusterShardingSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
context.setReceiveTimeout(120.seconds)
|
context.setReceiveTimeout(120.seconds)
|
||||||
|
|
||||||
// self.path.parent.name is the type name (utf-8 URL-encoded)
|
// self.path.parent.parent.name is the type name (utf-8 URL-encoded)
|
||||||
// self.path.name is the entry identifier (utf-8 URL-encoded)
|
// self.path.name is the entry identifier (utf-8 URL-encoded)
|
||||||
override def persistenceId: String = self.path.parent.name + "-" + self.path.name
|
override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.name
|
||||||
|
|
||||||
var count = 0
|
var count = 0
|
||||||
//#counter-actor
|
//#counter-actor
|
||||||
|
|
@ -162,7 +167,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
|
|
||||||
def createCoordinator(): Unit = {
|
def createCoordinator(): Unit = {
|
||||||
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
|
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
|
||||||
val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.second, rebalanceInterval = 2.seconds,
|
val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, rebalanceInterval = 2.seconds,
|
||||||
snapshotInterval = 3600.seconds, allocationStrategy)
|
snapshotInterval = 3600.seconds, allocationStrategy)
|
||||||
system.actorOf(ClusterSingletonManager.props(
|
system.actorOf(ClusterSingletonManager.props(
|
||||||
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps),
|
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps),
|
||||||
|
|
@ -173,15 +178,27 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy val region = system.actorOf(ShardRegion.props(
|
lazy val region = system.actorOf(ShardRegion.props(
|
||||||
|
typeName = "counter",
|
||||||
entryProps = Props[Counter],
|
entryProps = Props[Counter],
|
||||||
role = None,
|
role = None,
|
||||||
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
|
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
|
||||||
retryInterval = 1.second,
|
retryInterval = 1.second,
|
||||||
|
shardFailureBackoff = 1.second,
|
||||||
|
entryRestartBackoff = 1.second,
|
||||||
|
snapshotInterval = 1.hour,
|
||||||
bufferSize = 1000,
|
bufferSize = 1000,
|
||||||
|
rememberEntries = false,
|
||||||
idExtractor = idExtractor,
|
idExtractor = idExtractor,
|
||||||
shardResolver = shardResolver),
|
shardResolver = shardResolver),
|
||||||
name = "counterRegion")
|
name = "counterRegion")
|
||||||
|
|
||||||
|
lazy val persistentRegion = ClusterSharding(system).start(
|
||||||
|
typeName = "PersistentCounter",
|
||||||
|
entryProps = Some(Props[Counter]),
|
||||||
|
rememberEntries = true,
|
||||||
|
idExtractor = idExtractor,
|
||||||
|
shardResolver = shardResolver)
|
||||||
|
|
||||||
"Cluster sharding" must {
|
"Cluster sharding" must {
|
||||||
|
|
||||||
"setup shared journal" in {
|
"setup shared journal" in {
|
||||||
|
|
@ -239,22 +256,22 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
region ! EntryEnvelope(2, Increment)
|
region ! EntryEnvelope(2, Increment)
|
||||||
region ! Get(2)
|
region ! Get(2)
|
||||||
expectMsg(3)
|
expectMsg(3)
|
||||||
lastSender.path should be(node(second) / "user" / "counterRegion" / "2")
|
lastSender.path should be(node(second) / "user" / "counterRegion" / "2" / "2")
|
||||||
|
|
||||||
region ! Get(11)
|
region ! Get(11)
|
||||||
expectMsg(1)
|
expectMsg(1)
|
||||||
// local on first
|
// local on first
|
||||||
lastSender.path should be(region.path / "11")
|
lastSender.path should be(region.path / "11" / "11")
|
||||||
region ! Get(12)
|
region ! Get(12)
|
||||||
expectMsg(1)
|
expectMsg(1)
|
||||||
lastSender.path should be(node(second) / "user" / "counterRegion" / "12")
|
lastSender.path should be(node(second) / "user" / "counterRegion" / "0" / "12")
|
||||||
}
|
}
|
||||||
enterBarrier("first-update")
|
enterBarrier("first-update")
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
region ! Get(2)
|
region ! Get(2)
|
||||||
expectMsg(3)
|
expectMsg(3)
|
||||||
lastSender.path should be(region.path / "2")
|
lastSender.path should be(region.path / "2" / "2")
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-3")
|
enterBarrier("after-3")
|
||||||
|
|
@ -291,7 +308,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
within(1.second) {
|
within(1.second) {
|
||||||
region.tell(Get(2), probe1.ref)
|
region.tell(Get(2), probe1.ref)
|
||||||
probe1.expectMsg(4)
|
probe1.expectMsg(4)
|
||||||
probe1.lastSender.path should be(region.path / "2")
|
probe1.lastSender.path should be(region.path / "2" / "2")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
val probe2 = TestProbe()
|
val probe2 = TestProbe()
|
||||||
|
|
@ -299,7 +316,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
within(1.second) {
|
within(1.second) {
|
||||||
region.tell(Get(12), probe2.ref)
|
region.tell(Get(12), probe2.ref)
|
||||||
probe2.expectMsg(1)
|
probe2.expectMsg(1)
|
||||||
probe2.lastSender.path should be(region.path / "12")
|
probe2.lastSender.path should be(region.path / "0" / "12")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -331,25 +348,25 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
region ! EntryEnvelope(3, Increment)
|
region ! EntryEnvelope(3, Increment)
|
||||||
region ! Get(3)
|
region ! Get(3)
|
||||||
expectMsg(11)
|
expectMsg(11)
|
||||||
lastSender.path should be(node(third) / "user" / "counterRegion" / "3")
|
lastSender.path should be(node(third) / "user" / "counterRegion" / "3" / "3")
|
||||||
|
|
||||||
region ! EntryEnvelope(4, Increment)
|
region ! EntryEnvelope(4, Increment)
|
||||||
region ! Get(4)
|
region ! Get(4)
|
||||||
expectMsg(21)
|
expectMsg(21)
|
||||||
lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4")
|
lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4" / "4")
|
||||||
}
|
}
|
||||||
enterBarrier("first-update")
|
enterBarrier("first-update")
|
||||||
|
|
||||||
runOn(third) {
|
runOn(third) {
|
||||||
region ! Get(3)
|
region ! Get(3)
|
||||||
expectMsg(11)
|
expectMsg(11)
|
||||||
lastSender.path should be(region.path / "3")
|
lastSender.path should be(region.path / "3" / "3")
|
||||||
}
|
}
|
||||||
|
|
||||||
runOn(fourth) {
|
runOn(fourth) {
|
||||||
region ! Get(4)
|
region ! Get(4)
|
||||||
expectMsg(21)
|
expectMsg(21)
|
||||||
lastSender.path should be(region.path / "4")
|
lastSender.path should be(region.path / "4" / "4")
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-6")
|
enterBarrier("after-6")
|
||||||
|
|
@ -369,7 +386,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
within(1.second) {
|
within(1.second) {
|
||||||
region.tell(Get(3), probe3.ref)
|
region.tell(Get(3), probe3.ref)
|
||||||
probe3.expectMsg(11)
|
probe3.expectMsg(11)
|
||||||
probe3.lastSender.path should be(node(third) / "user" / "counterRegion" / "3")
|
probe3.lastSender.path should be(node(third) / "user" / "counterRegion" / "3" / "3")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
val probe4 = TestProbe()
|
val probe4 = TestProbe()
|
||||||
|
|
@ -377,7 +394,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
within(1.second) {
|
within(1.second) {
|
||||||
region.tell(Get(4), probe4.ref)
|
region.tell(Get(4), probe4.ref)
|
||||||
probe4.expectMsg(21)
|
probe4.expectMsg(21)
|
||||||
probe4.lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4")
|
probe4.lastSender.path should be(node(fourth) / "user" / "counterRegion" / "4" / "4")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -407,7 +424,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add more shards, which should later trigger rebalance to new node sixth
|
// add more shards, which should later trigger rebalance to new node sixth
|
||||||
for (n ← 5 to 10)
|
for (n ← 5 to 10)
|
||||||
region ! EntryEnvelope(n, Increment)
|
region ! EntryEnvelope(n, Increment)
|
||||||
|
|
||||||
|
|
@ -428,7 +445,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
for (n ← 1 to 10) {
|
for (n ← 1 to 10) {
|
||||||
region.tell(Get(n), probe.ref)
|
region.tell(Get(n), probe.ref)
|
||||||
probe.expectMsgType[Int]
|
probe.expectMsgType[Int]
|
||||||
if (probe.lastSender.path == region.path / n.toString)
|
if (probe.lastSender.path == region.path / (n % 12).toString / n.toString)
|
||||||
count += 1
|
count += 1
|
||||||
}
|
}
|
||||||
count should be(2)
|
count should be(2)
|
||||||
|
|
@ -444,6 +461,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
"support proxy only mode" in within(10.seconds) {
|
"support proxy only mode" in within(10.seconds) {
|
||||||
runOn(sixth) {
|
runOn(sixth) {
|
||||||
val proxy = system.actorOf(ShardRegion.proxyProps(
|
val proxy = system.actorOf(ShardRegion.proxyProps(
|
||||||
|
typeName = "counter",
|
||||||
role = None,
|
role = None,
|
||||||
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
|
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
|
||||||
retryInterval = 1.second,
|
retryInterval = 1.second,
|
||||||
|
|
@ -466,12 +484,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
val counterRegion: ActorRef = ClusterSharding(system).start(
|
val counterRegion: ActorRef = ClusterSharding(system).start(
|
||||||
typeName = "Counter",
|
typeName = "Counter",
|
||||||
entryProps = Some(Props[Counter]),
|
entryProps = Some(Props[Counter]),
|
||||||
|
rememberEntries = false,
|
||||||
idExtractor = idExtractor,
|
idExtractor = idExtractor,
|
||||||
shardResolver = shardResolver)
|
shardResolver = shardResolver)
|
||||||
//#counter-start
|
//#counter-start
|
||||||
ClusterSharding(system).start(
|
ClusterSharding(system).start(
|
||||||
typeName = "AnotherCounter",
|
typeName = "AnotherCounter",
|
||||||
entryProps = Some(Props[Counter]),
|
entryProps = Some(Props[Counter]),
|
||||||
|
rememberEntries = false,
|
||||||
idExtractor = idExtractor,
|
idExtractor = idExtractor,
|
||||||
shardResolver = shardResolver)
|
shardResolver = shardResolver)
|
||||||
}
|
}
|
||||||
|
|
@ -512,6 +532,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
|
val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
|
||||||
typeName = "ApiTest",
|
typeName = "ApiTest",
|
||||||
entryProps = Some(Props[Counter]),
|
entryProps = Some(Props[Counter]),
|
||||||
|
rememberEntries = false,
|
||||||
idExtractor = idExtractor,
|
idExtractor = idExtractor,
|
||||||
shardResolver = shardResolver)
|
shardResolver = shardResolver)
|
||||||
|
|
||||||
|
|
@ -522,5 +543,263 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
enterBarrier("after-10")
|
enterBarrier("after-10")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"Persistent Cluster Shards" must {
|
||||||
|
"recover entries upon restart" in within(50.seconds) {
|
||||||
|
runOn(third, fourth, fifth) {
|
||||||
|
ClusterSharding(system).start(
|
||||||
|
typeName = "PersistentCounterEntries",
|
||||||
|
entryProps = Some(Props[Counter]),
|
||||||
|
rememberEntries = true,
|
||||||
|
idExtractor = idExtractor,
|
||||||
|
shardResolver = shardResolver)
|
||||||
|
|
||||||
|
ClusterSharding(system).start(
|
||||||
|
typeName = "AnotherPersistentCounter",
|
||||||
|
entryProps = Some(Props[Counter]),
|
||||||
|
rememberEntries = true,
|
||||||
|
idExtractor = idExtractor,
|
||||||
|
shardResolver = shardResolver)
|
||||||
|
}
|
||||||
|
enterBarrier("persistent-started")
|
||||||
|
|
||||||
|
runOn(third) {
|
||||||
|
val counterRegion: ActorRef = ClusterSharding(system).shardRegion("PersistentCounterEntries")
|
||||||
|
|
||||||
|
//Create an increment counter 1
|
||||||
|
counterRegion ! EntryEnvelope(1, Increment)
|
||||||
|
counterRegion ! Get(1)
|
||||||
|
expectMsg(1)
|
||||||
|
|
||||||
|
//Shut down the shard and confirm it's dead
|
||||||
|
val shard = system.actorSelection(lastSender.path.parent)
|
||||||
|
val region = system.actorSelection(lastSender.path.parent.parent)
|
||||||
|
|
||||||
|
//Stop the shard cleanly
|
||||||
|
region ! HandOff("1")
|
||||||
|
expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1"))
|
||||||
|
|
||||||
|
awaitAssert({
|
||||||
|
shard ! Identify(1)
|
||||||
|
expectMsg(1 second, "Shard was still around", ActorIdentity(1, None))
|
||||||
|
}, 5 seconds, 500 millis)
|
||||||
|
|
||||||
|
//Get the path to where the shard now resides
|
||||||
|
counterRegion ! Get(13)
|
||||||
|
expectMsg(0)
|
||||||
|
|
||||||
|
//Check that counter 1 is now alive again, even though we have
|
||||||
|
// not sent a message to it via the ShardRegion
|
||||||
|
val counter1 = system.actorSelection(lastSender.path.parent / "1")
|
||||||
|
counter1 ! Identify(2)
|
||||||
|
receiveOne(1 second) match {
|
||||||
|
case ActorIdentity(2, location) ⇒
|
||||||
|
location should not be (None)
|
||||||
|
}
|
||||||
|
|
||||||
|
counter1 ! Get(1)
|
||||||
|
expectMsg(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-shard-restart")
|
||||||
|
|
||||||
|
runOn(fourth) {
|
||||||
|
//Check a second region does not share the same persistent shards
|
||||||
|
val anotherRegion: ActorRef = ClusterSharding(system).shardRegion("AnotherPersistentCounter")
|
||||||
|
|
||||||
|
//Create a separate 13 counter
|
||||||
|
anotherRegion ! EntryEnvelope(13, Increment)
|
||||||
|
anotherRegion ! Get(13)
|
||||||
|
expectMsg(1)
|
||||||
|
|
||||||
|
//Check that no counter "1" exists in this shard
|
||||||
|
val secondCounter1 = system.actorSelection(lastSender.path.parent / "1")
|
||||||
|
secondCounter1 ! Identify(3)
|
||||||
|
receiveOne(1 second) match {
|
||||||
|
case ActorIdentity(3, location) ⇒
|
||||||
|
location should be(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
enterBarrier("after-11")
|
||||||
|
}
|
||||||
|
|
||||||
|
"permanently stop entries which passivate" in within(50.seconds) {
|
||||||
|
runOn(third, fourth, fifth) {
|
||||||
|
persistentRegion
|
||||||
|
}
|
||||||
|
enterBarrier("cluster-started-12")
|
||||||
|
|
||||||
|
runOn(third) {
|
||||||
|
//Create and increment counter 1
|
||||||
|
persistentRegion ! EntryEnvelope(1, Increment)
|
||||||
|
persistentRegion ! Get(1)
|
||||||
|
expectMsg(1)
|
||||||
|
|
||||||
|
val counter1 = lastSender
|
||||||
|
val shard = system.actorSelection(counter1.path.parent)
|
||||||
|
val region = system.actorSelection(counter1.path.parent.parent)
|
||||||
|
|
||||||
|
//Create and increment counter 13
|
||||||
|
persistentRegion ! EntryEnvelope(13, Increment)
|
||||||
|
persistentRegion ! Get(13)
|
||||||
|
expectMsg(1)
|
||||||
|
|
||||||
|
val counter13 = lastSender
|
||||||
|
|
||||||
|
counter1.path.parent should be(counter13.path.parent)
|
||||||
|
|
||||||
|
//Send the shard the passivate message from the counter
|
||||||
|
shard.tell(Passivate(Stop), counter1)
|
||||||
|
awaitAssert({
|
||||||
|
//Check counter 1 is dead
|
||||||
|
counter1 ! Identify(1)
|
||||||
|
expectMsg(1 second, "Entry 1 was still around", ActorIdentity(1, None))
|
||||||
|
}, 5 second, 500 millis)
|
||||||
|
|
||||||
|
//Stop the shard cleanly
|
||||||
|
region ! HandOff("1")
|
||||||
|
expectMsg(10 seconds, "ShardStopped not received", ShardStopped("1"))
|
||||||
|
awaitAssert({
|
||||||
|
shard ! Identify(2)
|
||||||
|
expectMsg(1 second, "Shard was still around", ActorIdentity(2, None))
|
||||||
|
}, 5 seconds, 500 millis)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("shard-shutdown-12")
|
||||||
|
|
||||||
|
runOn(fourth) {
|
||||||
|
//Force the shard back up
|
||||||
|
persistentRegion ! Get(25)
|
||||||
|
expectMsg(0)
|
||||||
|
|
||||||
|
val shard = lastSender.path.parent
|
||||||
|
|
||||||
|
//Check counter 1 is still dead
|
||||||
|
system.actorSelection(shard / "1") ! Identify(3)
|
||||||
|
receiveOne(1 second) should be(ActorIdentity(3, None))
|
||||||
|
|
||||||
|
//Check counter 13 is alive again 8
|
||||||
|
system.actorSelection(shard / "13") ! Identify(4)
|
||||||
|
receiveOne(1 second) match {
|
||||||
|
case ActorIdentity(4, location) ⇒
|
||||||
|
location should not be (None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-12")
|
||||||
|
}
|
||||||
|
|
||||||
|
"restart entries which stop without passivating" in within(50.seconds) {
|
||||||
|
runOn(third, fourth) {
|
||||||
|
persistentRegion
|
||||||
|
}
|
||||||
|
enterBarrier("cluster-started-12")
|
||||||
|
|
||||||
|
runOn(third) {
|
||||||
|
//Create and increment counter 1
|
||||||
|
persistentRegion ! EntryEnvelope(1, Increment)
|
||||||
|
persistentRegion ! Get(1)
|
||||||
|
expectMsg(2)
|
||||||
|
|
||||||
|
val counter1 = system.actorSelection(lastSender.path)
|
||||||
|
|
||||||
|
counter1 ! Stop
|
||||||
|
|
||||||
|
awaitAssert({
|
||||||
|
counter1 ! Identify(1)
|
||||||
|
receiveOne(1 second) match {
|
||||||
|
case ActorIdentity(1, location) ⇒
|
||||||
|
location should not be (None)
|
||||||
|
}
|
||||||
|
}, 5.seconds, 500.millis)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-13")
|
||||||
|
}
|
||||||
|
|
||||||
|
"be migrated to new regions upon region failure" in within(50.seconds) {
|
||||||
|
lazy val migrationRegion: ActorRef = ClusterSharding(system).start(
|
||||||
|
typeName = "AutoMigrateRegionTest",
|
||||||
|
entryProps = Some(Props[Counter]),
|
||||||
|
rememberEntries = true,
|
||||||
|
idExtractor = idExtractor,
|
||||||
|
shardResolver = shardResolver)
|
||||||
|
|
||||||
|
//Start only one region, and force an entry onto that region
|
||||||
|
runOn(third) {
|
||||||
|
migrationRegion ! EntryEnvelope(1, Increment)
|
||||||
|
}
|
||||||
|
enterBarrier("shard1-region3")
|
||||||
|
|
||||||
|
//Start another region and test it talks to node 3
|
||||||
|
runOn(fourth) {
|
||||||
|
migrationRegion ! EntryEnvelope(1, Increment)
|
||||||
|
|
||||||
|
migrationRegion ! Get(1)
|
||||||
|
expectMsg(2)
|
||||||
|
lastSender.path should be(node(third) / "user" / "sharding" / "AutoMigrateRegionTest" / "1" / "1")
|
||||||
|
|
||||||
|
//Kill region 3
|
||||||
|
system.actorSelection(lastSender.path.parent.parent) ! PoisonPill
|
||||||
|
}
|
||||||
|
enterBarrier("region4-up")
|
||||||
|
|
||||||
|
//Wait for migration to happen
|
||||||
|
Thread sleep 2500
|
||||||
|
|
||||||
|
//Test the shard, thus counter was moved onto node 4 and started.
|
||||||
|
runOn(fourth) {
|
||||||
|
val counter1 = system.actorSelection(system / "sharding" / "AutoMigrateRegionTest" / "1" / "1")
|
||||||
|
counter1 ! Identify(1)
|
||||||
|
receiveOne(1 second) match {
|
||||||
|
case ActorIdentity(1, location) ⇒
|
||||||
|
location should not be (None)
|
||||||
|
}
|
||||||
|
|
||||||
|
counter1 ! Get(1)
|
||||||
|
expectMsg(2)
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-14")
|
||||||
|
}
|
||||||
|
|
||||||
|
"ensure rebalance restarts shards" in within(50.seconds) {
|
||||||
|
runOn(fourth) {
|
||||||
|
for (i ← 2 to 12) {
|
||||||
|
persistentRegion ! EntryEnvelope(i, Increment)
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i ← 2 to 12) {
|
||||||
|
persistentRegion ! Get(i)
|
||||||
|
expectMsg(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
enterBarrier("entries-started")
|
||||||
|
|
||||||
|
runOn(fifth) {
|
||||||
|
persistentRegion
|
||||||
|
}
|
||||||
|
enterBarrier("fifth-joined-shard")
|
||||||
|
|
||||||
|
runOn(fifth) {
|
||||||
|
var count = 0
|
||||||
|
|
||||||
|
for (n ← 2 to 12) {
|
||||||
|
var entry = system.actorSelection(system / "sharding" / "PersistentCounter" / (n % 12).toString / n.toString)
|
||||||
|
entry ! Identify(n)
|
||||||
|
receiveOne(1 second) match {
|
||||||
|
case ActorIdentity(id, Some(_)) if id == n ⇒ count = count + 1
|
||||||
|
case ActorIdentity(id, None) ⇒ //Not on the fifth shard
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(count >= 3, s"Not enough entries migrated, only ${count}")
|
||||||
|
}
|
||||||
|
|
||||||
|
enterBarrier("after-15")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -63,7 +63,7 @@ public class ClusterShardingTest {
|
||||||
//#counter-extractor
|
//#counter-extractor
|
||||||
|
|
||||||
//#counter-start
|
//#counter-start
|
||||||
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", Props.create(Counter.class),
|
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", Props.create(Counter.class), false,
|
||||||
messageExtractor);
|
messageExtractor);
|
||||||
//#counter-start
|
//#counter-start
|
||||||
|
|
||||||
|
|
@ -111,12 +111,12 @@ public class ClusterShardingTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
// getSelf().path().parent().name() is the type name (utf-8 URL-encoded)
|
// getSelf().path().parent().parent().name() is the type name (utf-8 URL-encoded)
|
||||||
// getSelf().path().name() is the entry identifier (utf-8 URL-encoded)
|
// getSelf().path().name() is the entry identifier (utf-8 URL-encoded)
|
||||||
@Override
|
@Override
|
||||||
public String persistenceId() {
|
public String persistenceId() {
|
||||||
return getSelf().path().parent().name() + "-" + getSelf().path().name();
|
return getSelf().path().parent().parent().name() + "-" + getSelf().path().name();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -70,6 +70,14 @@ can be retrieved by calling ``channel.socket``. This allows for accessing new NI
|
||||||
A new class ``DatagramChannelCreator`` which extends ``SocketOption`` has been added. ``DatagramChannelCreator`` can be used for
|
A new class ``DatagramChannelCreator`` which extends ``SocketOption`` has been added. ``DatagramChannelCreator`` can be used for
|
||||||
custom ``DatagramChannel`` creation logic. This allows for opening IPv6 multicast datagram channels.
|
custom ``DatagramChannel`` creation logic. This allows for opening IPv6 multicast datagram channels.
|
||||||
|
|
||||||
|
Cluster Sharding Entry Path Change
|
||||||
|
==================================
|
||||||
|
Previously in ``2.3.x`` entries were direct children of the local ``ShardRegion``. In examples the ``persistenceId`` of entries
|
||||||
|
included ``self.path.parent.name`` to include the cluster type name.
|
||||||
|
|
||||||
|
In ``2.4.x`` entries are now children of a ``Shard``, which in turn is a child of the local ``ShardRegion``. To include the shard
|
||||||
|
type in the ``persistenceId`` it is now accessed by ``self.path.parent.parent.name`` from each entry.
|
||||||
|
|
||||||
Removed Deprecated Features
|
Removed Deprecated Features
|
||||||
===========================
|
===========================
|
||||||
|
|
||||||
|
|
@ -91,7 +99,7 @@ The following, previously deprecated, features have been removed:
|
||||||
in the way that was introduced in Akka 2.3.
|
in the way that was introduced in Akka 2.3.
|
||||||
|
|
||||||
* Timeout constructor without unit
|
* Timeout constructor without unit
|
||||||
|
|
||||||
* JavaLoggingEventHandler, replaced by JavaLogger
|
* JavaLoggingEventHandler, replaced by JavaLogger
|
||||||
|
|
||||||
* UntypedActorFactory
|
* UntypedActorFactory
|
||||||
|
|
@ -103,7 +111,7 @@ Slf4j logging filter
|
||||||
|
|
||||||
If you use ``Slf4jLogger`` you should add the following configuration::
|
If you use ``Slf4jLogger`` you should add the following configuration::
|
||||||
|
|
||||||
akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
|
akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
|
||||||
|
|
||||||
It will filter the log events using the backend configuration (e.g. logback.xml) before
|
It will filter the log events using the backend configuration (e.g. logback.xml) before
|
||||||
they are published to the event bus.
|
they are published to the event bus.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue