Merge pull request #17698 from akka/wip-17364-sharding-props-patriknw

Various small API improvements of Cluster Sharding
This commit is contained in:
Patrik Nordwall 2015-06-16 15:27:42 +02:00
commit 1eaebcedb8
11 changed files with 913 additions and 700 deletions

View file

@ -14,44 +14,60 @@ akka.cluster.sharding {
# e.g. '/user/sharding'
guardian-name = sharding
# Specifies that entities runs on cluster nodes with a specific role.
# If the role is not specified (or empty) all nodes in the cluster are used.
role = ""
# When this is set to 'on' the active entity actors will automatically be restarted
# upon Shard restart. i.e. if the Shard is started on a different ShardRegion
# due to rebalance or crash.
remember-entities = off
# If the coordinator can't store state changes it will be stopped
# and started again after this duration.
coordinator-failure-backoff = 10 s
# Start the coordinator singleton manager on members tagged with this role.
# All members are used if undefined or empty.
# ShardRegion actor is started in proxy only mode on nodes that are not tagged
# with this role.
role = ""
# The ShardRegion retries registration and shard location requests to the
# ShardCoordinator with this interval if it does not reply.
retry-interval = 2 s
# Maximum number of messages that are buffered by a ShardRegion actor.
buffer-size = 100000
# Timeout of the shard rebalancing process.
handoff-timeout = 60 s
# Time given to a region to acknowdge it's hosting a shard.
# Time given to a region to acknowledge it's hosting a shard.
shard-start-timeout = 10 s
# If the shard can't store state changes it will retry the action
# again after this duration. Any messages sent to an affected entry
# again after this duration. Any messages sent to an affected entity
# will be buffered until the state change is processed
shard-failure-backoff = 10 s
# If the shard is remembering entries and an entry stops itself without
# using passivate. The entry will be restarted after this duration or when
# If the shard is remembering entities and an entity stops itself without
# using passivate. The entity will be restarted after this duration or when
# the next message for it is received, which ever occurs first.
entry-restart-backoff = 10 s
entity-restart-backoff = 10 s
# Rebalance check is performed periodically with this interval.
rebalance-interval = 10 s
# How often the coordinator saves persistent snapshots, which are
# used to reduce recovery times
snapshot-interval = 3600 s
# Absolute path to the journal plugin configuration entity that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default journal plugin is used. Note that this is not related to
# persistence used by the entity actors.
journal-plugin-id = ""
# Absolute path to the snapshot plugin configuration entity that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default snapshot plugin is used. Note that this is not related to
# persistence used by the entity actors.
snapshot-plugin-id = ""
# The coordinator saves persistent snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
snapshot-after = 1000
# Setting for the default shard allocation strategy
least-shard-allocation-strategy {
@ -62,5 +78,8 @@ akka.cluster.sharding {
# The number of ongoing rebalancing processes is limited to this number.
max-simultaneous-rebalance = 3
}
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
coordinator-singleton = ${akka.cluster.singleton}
}
# //#sharding-ext-config

View file

@ -0,0 +1,139 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.sharding
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import akka.actor.NoSerializationVerificationNeeded
import com.typesafe.config.Config
import akka.cluster.singleton.ClusterSingletonManagerSettings
object ClusterShardingSettings {
/**
* Create settings from the default configuration
* `akka.cluster.sharding`.
*/
def apply(system: ActorSystem): ClusterShardingSettings =
apply(system.settings.config.getConfig("akka.cluster.sharding"))
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.sharding`.
*/
def apply(config: Config): ClusterShardingSettings = {
val tuningParameters = new TuningParameters(
coordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis,
retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis,
bufferSize = config.getInt("buffer-size"),
handOffTimeout = config.getDuration("handoff-timeout", MILLISECONDS).millis,
shardStartTimeout = config.getDuration("shard-start-timeout", MILLISECONDS).millis,
shardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis,
entityRestartBackoff = config.getDuration("entity-restart-backoff", MILLISECONDS).millis,
rebalanceInterval = config.getDuration("rebalance-interval", MILLISECONDS).millis,
snapshotAfter = config.getInt("snapshot-after"),
leastShardAllocationRebalanceThreshold =
config.getInt("least-shard-allocation-strategy.rebalance-threshold"),
leastShardAllocationMaxSimultaneousRebalance =
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"))
val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton"))
new ClusterShardingSettings(
role = roleOption(config.getString("role")),
rememberEntities = config.getBoolean("remember-entities"),
journalPluginId = config.getString("journal-plugin-id"),
snapshotPluginId = config.getString("snapshot-plugin-id"),
tuningParameters,
coordinatorSingletonSettings)
}
/**
* Java API: Create settings from the default configuration
* `akka.cluster.sharding`.
*/
def create(system: ActorSystem): ClusterShardingSettings = apply(system)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.sharding`.
*/
def create(config: Config): ClusterShardingSettings = apply(config)
/**
* INTERNAL API
*/
private[akka] def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
class TuningParameters(
val coordinatorFailureBackoff: FiniteDuration,
val retryInterval: FiniteDuration,
val bufferSize: Int,
val handOffTimeout: FiniteDuration,
val shardStartTimeout: FiniteDuration,
val shardFailureBackoff: FiniteDuration,
val entityRestartBackoff: FiniteDuration,
val rebalanceInterval: FiniteDuration,
val snapshotAfter: Int,
val leastShardAllocationRebalanceThreshold: Int,
val leastShardAllocationMaxSimultaneousRebalance: Int)
}
/**
* @param role specifies that this entity type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param rememberEntities true if active entity actors shall be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param journalPluginId Absolute path to the journal plugin configuration entity that is to
* be used for the internal persistence of ClusterSharding. If not defined the default
* journal plugin is used. Note that this is not related to persistence used by the entity
* actors.
* @param snapshotPluginId Absolute path to the snapshot plugin configuration entity that is to
* be used for the internal persistence of ClusterSharding. If not defined the default
* snapshot plugin is used. Note that this is not related to persistence used by the entity
* actors.
* @param tuningParameters additional tuning parameters, see descriptions in reference.conf
*/
final class ClusterShardingSettings(
val role: Option[String],
val rememberEntities: Boolean,
val journalPluginId: String,
val snapshotPluginId: String,
val tuningParameters: ClusterShardingSettings.TuningParameters,
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))
def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role)
def withRememberEntities(rememberEntities: Boolean): ClusterShardingSettings =
copy(rememberEntities = rememberEntities)
def withJournalPluginId(journalPluginId: String): ClusterShardingSettings =
copy(journalPluginId = journalPluginId)
def withSnapshotPluginId(snapshotPluginId: String): ClusterShardingSettings =
copy(snapshotPluginId = snapshotPluginId)
def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings =
copy(tuningParameters = tuningParameters)
def withCoordinatorSingletonSettings(coordinatorSingletonSettings: ClusterSingletonManagerSettings): ClusterShardingSettings =
copy(coordinatorSingletonSettings = coordinatorSingletonSettings)
private def copy(role: Option[String] = role,
rememberEntities: Boolean = rememberEntities,
journalPluginId: String = journalPluginId,
snapshotPluginId: String = snapshotPluginId,
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings =
new ClusterShardingSettings(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
tuningParameters,
coordinatorSingletonSettings)
}

View file

@ -52,11 +52,11 @@ object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig {
}
}
val idExtractor: ShardRegion.IdExtractor = {
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int (id.toString, id)
}
val shardResolver: ShardRegion.ShardResolver = msg msg match {
val extractShardId: ShardRegion.ExtractShardId = msg msg match {
case id: Int id.toString
}
@ -134,11 +134,10 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Some(Props[Entity]),
roleOverride = None,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver,
entityProps = Props[Entity],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId,
allocationStrategy = TestAllocationStrategy(allocator),
handOffStopMessage = PoisonPill)
}

View file

@ -62,12 +62,12 @@ object ClusterShardingFailureSpec extends MultiNodeConfig {
}
}
val idExtractor: ShardRegion.IdExtractor = {
val extractEntityId: ShardRegion.ExtractEntityId = {
case m @ Get(id) (id, m)
case m @ Add(id, _) (id, m)
}
val shardResolver: ShardRegion.ShardResolver = {
val extractShardId: ShardRegion.ExtractShardId = {
case Get(id) id.charAt(0).toString
case Add(id, _) id.charAt(0).toString
}
@ -111,11 +111,10 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Some(Props[Entity]),
roleOverride = None,
rememberEntries = true,
idExtractor = idExtractor,
shardResolver = shardResolver)
entityProps = Props[Entity],
settings = ClusterShardingSettings(system).withRememberEntities(true),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")
@ -184,17 +183,17 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
runOn(first) {
region ! Get("21")
expectMsg(Value("21", 3))
val entry21 = lastSender
val shard2 = system.actorSelection(entry21.path.parent)
val entity21 = lastSender
val shard2 = system.actorSelection(entity21.path.parent)
//Test the ShardCoordinator allocating shards during a journal failure
region ! Add("30", 3)
//Test the Shard starting entries and persisting during a journal failure
//Test the Shard starting entities and persisting during a journal failure
region ! Add("11", 1)
//Test the Shard passivate works during a journal failure
shard2.tell(Passivate(PoisonPill), entry21)
shard2.tell(Passivate(PoisonPill), entity21)
region ! Add("21", 1)
region ! Get("21")

View file

@ -56,11 +56,11 @@ object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig {
}
}
val idExtractor: ShardRegion.IdExtractor = {
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int (id.toString, id)
}
val shardResolver: ShardRegion.ShardResolver = msg msg match {
val extractShardId: ShardRegion.ExtractShardId = msg msg match {
case id: Int id.toString
}
@ -121,11 +121,10 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Some(Props[Entity]),
roleOverride = None,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver,
entityProps = Props[Entity],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId,
allocationStrategy,
handOffStopMessage = StopEntity)
}

View file

@ -33,7 +33,7 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig {
val fourth = role("fourth")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
@ -68,11 +68,11 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig {
}
}
val idExtractor: ShardRegion.IdExtractor = {
val extractEntityId: ShardRegion.ExtractEntityId = {
case m @ Ping(id) (id, m)
}
val shardResolver: ShardRegion.ShardResolver = {
val extractShardId: ShardRegion.ExtractShardId = {
case Ping(id: String) id.charAt(0).toString
}
@ -123,11 +123,10 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Some(Props[Entity]),
roleOverride = None,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver)
entityProps = Props[Entity],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")

View file

@ -5,6 +5,8 @@ package akka.cluster.sharding
import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff }
import akka.cluster.sharding.ShardRegion.Passivate
import akka.cluster.sharding.ShardRegion.GetCurrentRegions
import akka.cluster.sharding.ShardRegion.CurrentRegions
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
@ -48,11 +50,10 @@ object ClusterShardingSpec extends MultiNodeConfig {
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec"
akka.cluster.sharding {
role = backend
retry-interval = 1 s
handoff-timeout = 10 s
shard-start-timeout = 5s
entry-restart-backoff = 1s
entity-restart-backoff = 1s
rebalance-interval = 2 s
least-shard-allocation-strategy {
rebalance-threshold = 2
@ -69,7 +70,7 @@ object ClusterShardingSpec extends MultiNodeConfig {
case object Increment
case object Decrement
final case class Get(counterId: Long)
final case class EntryEnvelope(id: Long, payload: Any)
final case class EntityEnvelope(id: Long, payload: Any)
case object Stop
final case class CounterChanged(delta: Int)
@ -80,7 +81,7 @@ object ClusterShardingSpec extends MultiNodeConfig {
context.setReceiveTimeout(120.seconds)
// self.path.parent.parent.name is the type name (utf-8 URL-encoded)
// self.path.name is the entry identifier (utf-8 URL-encoded)
// self.path.name is the entity identifier (utf-8 URL-encoded)
override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.name
var count = 0
@ -110,16 +111,16 @@ object ClusterShardingSpec extends MultiNodeConfig {
}
//#counter-actor
val idExtractor: ShardRegion.IdExtractor = {
case EntryEnvelope(id, payload) (id.toString, payload)
case msg @ Get(id) (id.toString, msg)
val extractEntityId: ShardRegion.ExtractEntityId = {
case EntityEnvelope(id, payload) (id.toString, payload)
case msg @ Get(id) (id.toString, msg)
}
val numberOfShards = 12
val shardResolver: ShardRegion.ShardResolver = {
case EntryEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
}
}
@ -129,16 +130,16 @@ object ClusterShardingDocCode {
import ClusterShardingSpec._
//#counter-extractor
val idExtractor: ShardRegion.IdExtractor = {
case EntryEnvelope(id, payload) (id.toString, payload)
case msg @ Get(id) (id.toString, msg)
val extractEntityId: ShardRegion.ExtractEntityId = {
case EntityEnvelope(id, payload) (id.toString, payload)
case msg @ Get(id) (id.toString, msg)
}
val numberOfShards = 100
val shardResolver: ShardRegion.ShardResolver = {
case EntryEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
}
//#counter-extractor
@ -183,47 +184,57 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}
def createCoordinator(): Unit = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
def coordinatorProps(rebalanceEnabled: Boolean) =
ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds,
rebalanceInterval = if (rebalanceEnabled) 2.seconds else 3600.seconds,
snapshotInterval = 3600.seconds, allocationStrategy)
def coordinatorProps(typeName: String, rebalanceEnabled: Boolean) = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
val cfg = ConfigFactory.parseString(s"""
handoff-timeout = 10s
shard-start-timeout = 10s
rebalance-interval = ${if (rebalanceEnabled) "2s" else "3600s"}
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
val settings = ClusterShardingSettings(cfg)
ShardCoordinator.props(typeName, settings, allocationStrategy)
}
List("counter", "rebalancingCounter", "PersistentCounterEntries", "AnotherPersistentCounter",
"PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { coordinatorName
val rebalanceEnabled = coordinatorName.toLowerCase.startsWith("rebalancing")
List("counter", "rebalancingCounter", "PersistentCounterEntities", "AnotherPersistentCounter",
"PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { typeName
val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing")
system.actorOf(ClusterSingletonManager.props(
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps(rebalanceEnabled)),
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds,
coordinatorProps(typeName, rebalanceEnabled)),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system)),
name = coordinatorName + "Coordinator")
name = typeName + "Coordinator")
}
}
def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = system.actorOf(ShardRegion.props(
typeName = typeName,
entryProps = Props[Counter],
role = None,
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
retryInterval = 1.second,
shardFailureBackoff = 1.second,
entryRestartBackoff = 1.second,
snapshotInterval = 1.hour,
bufferSize = 1000,
rememberEntries = rememberEntries,
idExtractor = idExtractor,
shardResolver = shardResolver,
handOffStopMessage = PoisonPill),
name = typeName + "Region")
def createRegion(typeName: String, rememberEntities: Boolean): ActorRef = {
val cfg = ConfigFactory.parseString("""
retry-interval = 1s
shard-failure-backoff = 1s
entity-restart-backoff = 1s
buffer-size = 1000
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
val settings = ClusterShardingSettings(cfg)
.withRememberEntities(rememberEntities)
system.actorOf(ShardRegion.props(
typeName = typeName,
entityProps = Props[Counter],
settings = settings,
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
extractEntityId = extractEntityId,
extractShardId = extractShardId,
handOffStopMessage = PoisonPill),
name = typeName + "Region")
}
lazy val region = createRegion("counter", rememberEntries = false)
lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntries = false)
lazy val region = createRegion("counter", rememberEntities = false)
lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntities = false)
lazy val persistentEntriesRegion = createRegion("PersistentCounterEntries", rememberEntries = true)
lazy val anotherPersistentRegion = createRegion("AnotherPersistentCounter", rememberEntries = true)
lazy val persistentRegion = createRegion("PersistentCounter", rememberEntries = true)
lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntries = true)
lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntries = true)
lazy val persistentEntitiesRegion = createRegion("PersistentCounterEntities", rememberEntities = true)
lazy val anotherPersistentRegion = createRegion("AnotherPersistentCounter", rememberEntities = true)
lazy val persistentRegion = createRegion("PersistentCounter", rememberEntities = true)
lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntities = true)
lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntities = true)
"Cluster sharding" must {
@ -248,12 +259,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
join(first, first)
runOn(first) {
region ! EntryEnvelope(1, Increment)
region ! EntryEnvelope(1, Increment)
region ! EntryEnvelope(1, Increment)
region ! EntryEnvelope(1, Decrement)
region ! EntityEnvelope(1, Increment)
region ! EntityEnvelope(1, Increment)
region ! EntityEnvelope(1, Increment)
region ! EntityEnvelope(1, Decrement)
region ! Get(1)
expectMsg(2)
region ! GetCurrentRegions
expectMsg(CurrentRegions(Set(Cluster(system).selfAddress)))
}
enterBarrier("after-2")
@ -263,15 +277,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
join(second, first)
runOn(second) {
region ! EntryEnvelope(2, Increment)
region ! EntryEnvelope(2, Increment)
region ! EntryEnvelope(2, Increment)
region ! EntryEnvelope(2, Decrement)
region ! EntityEnvelope(2, Increment)
region ! EntityEnvelope(2, Increment)
region ! EntityEnvelope(2, Increment)
region ! EntityEnvelope(2, Decrement)
region ! Get(2)
expectMsg(2)
region ! EntryEnvelope(11, Increment)
region ! EntryEnvelope(12, Increment)
region ! EntityEnvelope(11, Increment)
region ! EntityEnvelope(12, Increment)
region ! Get(11)
expectMsg(1)
region ! Get(12)
@ -279,7 +293,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}
enterBarrier("second-update")
runOn(first) {
region ! EntryEnvelope(2, Increment)
region ! EntityEnvelope(2, Increment)
region ! Get(2)
expectMsg(3)
lastSender.path should ===(node(second) / "user" / "counterRegion" / "2" / "2")
@ -298,19 +312,22 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
region ! Get(2)
expectMsg(3)
lastSender.path should ===(region.path / "2" / "2")
region ! GetCurrentRegions
expectMsg(CurrentRegions(Set(Cluster(system).selfAddress, node(first).address)))
}
enterBarrier("after-3")
}
"support passivation and activation of entries" in {
"support passivation and activation of entities" in {
runOn(second) {
region ! Get(2)
expectMsg(3)
region ! EntryEnvelope(2, ReceiveTimeout)
region ! EntityEnvelope(2, ReceiveTimeout)
// let the Passivate-Stop roundtrip begin to trigger buffering of subsequent messages
Thread.sleep(200)
region ! EntryEnvelope(2, Increment)
region ! EntityEnvelope(2, Increment)
region ! Get(2)
expectMsg(4)
}
@ -319,14 +336,17 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
"support proxy only mode" in within(10.seconds) {
runOn(second) {
val cfg = ConfigFactory.parseString("""
retry-interval = 1s
buffer-size = 1000
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
val settings = ClusterShardingSettings(cfg)
val proxy = system.actorOf(ShardRegion.proxyProps(
typeName = "counter",
role = None,
settings,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
retryInterval = 1.second,
bufferSize = 1000,
idExtractor = idExtractor,
shardResolver = shardResolver),
extractEntityId = extractEntityId,
extractShardId = extractShardId),
name = "regionProxy")
proxy ! Get(1)
@ -376,7 +396,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(third) {
for (_ 1 to 10)
region ! EntryEnvelope(3, Increment)
region ! EntityEnvelope(3, Increment)
region ! Get(3)
expectMsg(10)
lastSender.path should ===(region.path / "3" / "3") // local
@ -385,7 +405,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(fourth) {
for (_ 1 to 20)
region ! EntryEnvelope(4, Increment)
region ! EntityEnvelope(4, Increment)
region ! Get(4)
expectMsg(20)
lastSender.path should ===(region.path / "4" / "4") // local
@ -393,12 +413,12 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
enterBarrier("fourth-update")
runOn(first) {
region ! EntryEnvelope(3, Increment)
region ! EntityEnvelope(3, Increment)
region ! Get(3)
expectMsg(11)
lastSender.path should ===(node(third) / "user" / "counterRegion" / "3" / "3")
region ! EntryEnvelope(4, Increment)
region ! EntityEnvelope(4, Increment)
region ! Get(4)
expectMsg(21)
lastSender.path should ===(node(fourth) / "user" / "counterRegion" / "4" / "4")
@ -455,7 +475,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(fourth) {
for (n 1 to 10) {
rebalancingRegion ! EntryEnvelope(n, Increment)
rebalancingRegion ! EntityEnvelope(n, Increment)
rebalancingRegion ! Get(n)
expectMsg(1)
}
@ -490,19 +510,17 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
//#counter-start
val counterRegion: ActorRef = ClusterSharding(system).start(
typeName = "Counter",
entryProps = Some(Props[Counter]),
roleOverride = None,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver)
entityProps = Props[Counter],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
//#counter-start
ClusterSharding(system).start(
typeName = "AnotherCounter",
entryProps = Some(Props[Counter]),
roleOverride = None,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver)
entityProps = Props[Counter],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
enterBarrier("extension-started")
runOn(fifth) {
@ -511,12 +529,12 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
counterRegion ! Get(123)
expectMsg(0)
counterRegion ! EntryEnvelope(123, Increment)
counterRegion ! EntityEnvelope(123, Increment)
counterRegion ! Get(123)
expectMsg(1)
//#counter-usage
ClusterSharding(system).shardRegion("AnotherCounter") ! EntryEnvelope(123, Decrement)
ClusterSharding(system).shardRegion("AnotherCounter") ! EntityEnvelope(123, Decrement)
ClusterSharding(system).shardRegion("AnotherCounter") ! Get(123)
expectMsg(-1)
}
@ -526,7 +544,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
// sixth is a frontend node, i.e. proxy only
runOn(sixth) {
for (n 1000 to 1010) {
ClusterSharding(system).shardRegion("Counter") ! EntryEnvelope(n, Increment)
ClusterSharding(system).shardRegion("Counter") ! EntityEnvelope(n, Increment)
ClusterSharding(system).shardRegion("Counter") ! Get(n)
expectMsg(1)
lastSender.path.address should not be (Cluster(system).selfAddress)
@ -540,11 +558,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(first) {
val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
typeName = "ApiTest",
entryProps = Some(Props[Counter]),
roleOverride = None,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver)
entityProps = Props[Counter],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
val counterRegionViaGet: ActorRef = ClusterSharding(system).shardRegion("ApiTest")
@ -555,17 +572,17 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}
"Persistent Cluster Shards" must {
"recover entries upon restart" in within(50.seconds) {
"recover entities upon restart" in within(50.seconds) {
runOn(third, fourth, fifth) {
persistentEntriesRegion
persistentEntitiesRegion
anotherPersistentRegion
}
enterBarrier("persistent-started")
runOn(third) {
//Create an increment counter 1
persistentEntriesRegion ! EntryEnvelope(1, Increment)
persistentEntriesRegion ! Get(1)
persistentEntitiesRegion ! EntityEnvelope(1, Increment)
persistentEntitiesRegion ! Get(1)
expectMsg(1)
//Shut down the shard and confirm it's dead
@ -583,7 +600,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}, 5 seconds, 500 millis)
//Get the path to where the shard now resides
persistentEntriesRegion ! Get(13)
persistentEntitiesRegion ! Get(13)
expectMsg(0)
//Check that counter 1 is now alive again, even though we have
@ -602,7 +619,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
//Check a second region does not share the same persistent shards
//Create a separate 13 counter
anotherPersistentRegion ! EntryEnvelope(13, Increment)
anotherPersistentRegion ! EntityEnvelope(13, Increment)
anotherPersistentRegion ! Get(13)
expectMsg(1)
@ -615,7 +632,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
enterBarrier("after-12")
}
"permanently stop entries which passivate" in within(15.seconds) {
"permanently stop entities which passivate" in within(15.seconds) {
runOn(third, fourth, fifth) {
persistentRegion
}
@ -623,7 +640,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(third) {
//Create and increment counter 1
persistentRegion ! EntryEnvelope(1, Increment)
persistentRegion ! EntityEnvelope(1, Increment)
persistentRegion ! Get(1)
expectMsg(1)
@ -632,7 +649,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
val region = system.actorSelection(counter1.path.parent.parent)
//Create and increment counter 13
persistentRegion ! EntryEnvelope(13, Increment)
persistentRegion ! EntityEnvelope(13, Increment)
persistentRegion ! Get(13)
expectMsg(1)
@ -651,7 +668,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
awaitAssert({
//Check counter 1 is dead
counter1.tell(Identify(1), probe1.ref)
probe1.expectMsg(1 second, "Entry 1 was still around", ActorIdentity(1, None))
probe1.expectMsg(1 second, "Entity 1 was still around", ActorIdentity(1, None))
}, 5 second, 500 millis)
//Stop the shard cleanly
@ -686,7 +703,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
enterBarrier("after-13")
}
"restart entries which stop without passivating" in within(50.seconds) {
"restart entities which stop without passivating" in within(50.seconds) {
runOn(third, fourth) {
persistentRegion
}
@ -694,7 +711,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(third) {
//Create and increment counter 1
persistentRegion ! EntryEnvelope(1, Increment)
persistentRegion ! EntityEnvelope(1, Increment)
persistentRegion ! Get(1)
expectMsg(2)
@ -714,9 +731,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
"be migrated to new regions upon region failure" in within(15.seconds) {
//Start only one region, and force an entry onto that region
//Start only one region, and force an entity onto that region
runOn(third) {
autoMigrateRegion ! EntryEnvelope(1, Increment)
autoMigrateRegion ! EntityEnvelope(1, Increment)
autoMigrateRegion ! Get(1)
expectMsg(1)
}
@ -724,7 +741,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
//Start another region and test it talks to node 3
runOn(fourth) {
autoMigrateRegion ! EntryEnvelope(1, Increment)
autoMigrateRegion ! EntityEnvelope(1, Increment)
autoMigrateRegion ! Get(1)
expectMsg(2)
@ -755,7 +772,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
"ensure rebalance restarts shards" in within(50.seconds) {
runOn(fourth) {
for (i 2 to 12) {
rebalancingPersistentRegion ! EntryEnvelope(i, Increment)
rebalancingPersistentRegion ! EntityEnvelope(i, Increment)
}
for (i 2 to 12) {
@ -763,7 +780,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
expectMsg(1)
}
}
enterBarrier("entries-started")
enterBarrier("entities-started")
runOn(fifth) {
rebalancingPersistentRegion
@ -774,8 +791,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
awaitAssert {
var count = 0
for (n 2 to 12) {
val entry = system.actorSelection(rebalancingPersistentRegion.path / (n % 12).toString / n.toString)
entry ! Identify(n)
val entity = system.actorSelection(rebalancingPersistentRegion.path / (n % 12).toString / n.toString)
entity ! Identify(n)
receiveOne(3 seconds) match {
case ActorIdentity(id, Some(_)) if id == n count = count + 1
case ActorIdentity(id, None) //Not on the fifth shard

View file

@ -34,9 +34,9 @@ public class ClusterShardingTest {
ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
@Override
public String entryId(Object message) {
if (message instanceof Counter.EntryEnvelope)
return String.valueOf(((Counter.EntryEnvelope) message).id);
public String entityId(Object message) {
if (message instanceof Counter.EntityEnvelope)
return String.valueOf(((Counter.EntityEnvelope) message).id);
else if (message instanceof Counter.Get)
return String.valueOf(((Counter.Get) message).counterId);
else
@ -44,9 +44,9 @@ public class ClusterShardingTest {
}
@Override
public Object entryMessage(Object message) {
if (message instanceof Counter.EntryEnvelope)
return ((Counter.EntryEnvelope) message).payload;
public Object entityMessage(Object message) {
if (message instanceof Counter.EntityEnvelope)
return ((Counter.EntityEnvelope) message).payload;
else
return message;
}
@ -54,8 +54,8 @@ public class ClusterShardingTest {
@Override
public String shardId(Object message) {
int numberOfShards = 100;
if (message instanceof Counter.EntryEnvelope) {
long id = ((Counter.EntryEnvelope) message).id;
if (message instanceof Counter.EntityEnvelope) {
long id = ((Counter.EntityEnvelope) message).id;
return String.valueOf(id % numberOfShards);
} else if (message instanceof Counter.Get) {
long id = ((Counter.Get) message).counterId;
@ -70,15 +70,16 @@ public class ClusterShardingTest {
//#counter-start
Option<String> roleOption = Option.none();
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
Props.create(Counter.class), Option.java2ScalaOption(roleOption), false, messageExtractor);
Props.create(Counter.class), settings, messageExtractor);
//#counter-start
//#counter-usage
ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
counterRegion.tell(new Counter.Get(123), getSelf());
counterRegion.tell(new Counter.EntryEnvelope(123,
counterRegion.tell(new Counter.EntityEnvelope(123,
Counter.CounterOp.INCREMENT), getSelf());
counterRegion.tell(new Counter.Get(123), getSelf());
//#counter-usage
@ -99,11 +100,11 @@ public class ClusterShardingTest {
}
}
public static class EntryEnvelope {
public static class EntityEnvelope {
final public long id;
final public Object payload;
public EntryEnvelope(long id, Object payload) {
public EntityEnvelope(long id, Object payload) {
this.id = id;
this.payload = payload;
}
@ -120,7 +121,7 @@ public class ClusterShardingTest {
int count = 0;
// getSelf().path().parent().parent().name() is the type name (utf-8 URL-encoded)
// getSelf().path().name() is the entry identifier (utf-8 URL-encoded)
// getSelf().path().name() is the entity identifier (utf-8 URL-encoded)
@Override
public String persistenceId() {
return getSelf().path().parent().parent().name() + "-" + getSelf().path().name();

View file

@ -227,6 +227,23 @@ The classes changed package name from ``akka.contrib.pattern`` to ``akka.cluster
The configuration properties changed name to ``akka.cluster.sharding``.
ClusterSharding construction
============================
Several parameters of the ``start`` method of the ``ClusterSharding`` extension are now defined
in a settings object ``ClusterShardingSettings``.
It can be created from system configuration properties and also amended with API.
These settings can be defined differently per entry type if needed.
Starting the ``ShardRegion`` in proxy mode is now done with the ``startProxy`` method
of the ``ClusterSharding`` extension instead of the optional ``entryProps`` parameter.
Entry was renamed to Entity, for example in the ``MessagesExtractor`` in the Java API
and the ``EntityId`` type in the Scala API.
``idExtractor`` function was renamed to ``extractEntityId``. ``shardResolver`` function
was renamed to ``extractShardId``.
ClusterSingletonManager and ClusterSingletonProxy construction
==============================================================

View file

@ -8,131 +8,132 @@ be able to interact with them using their logical identifier, but without having
their physical location in the cluster, which might also change over time.
It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology.
Here we call these actors "entries". These actors typically have persistent (durable) state,
Here we call these actors "entities". These actors typically have persistent (durable) state,
but this feature is not limited to actors with persistent state.
Cluster sharding is typically used when you have many stateful actors that together consume
more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors
it might be easier to run them on a :ref:`cluster-singleton` node.
In this context sharding means that actors with an identifier, so called entries,
can be automatically distributed across multiple nodes in the cluster. Each entry
actor runs only at one place, and messages can be sent to the entry without requiring
In this context sharding means that actors with an identifier, so called entities,
can be automatically distributed across multiple nodes in the cluster. Each entity
actor runs only at one place, and messages can be sent to the entity without requiring
the sender to know the location of the destination actor. This is achieved by sending
the messages via a ``ShardRegion`` actor provided by this extension, which knows how
to route the message with the entry id to the final destination.
to route the message with the entity id to the final destination.
An Example in Java
------------------
This is how an entry actor may look like:
This is how an entity actor may look like:
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-actor
The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state.
It does not have to be a persistent actor, but in case of failure or migration of entries between nodes it must be able to recover
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
its state if it is valuable.
Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node
in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start``
in the cluster, supposed to register the supported entity types with the ``ClusterSharding.start``
method. ``ClusterSharding.start`` gives you the reference which you can pass along.
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-start
The ``messageExtractor`` defines application specific methods to extract the entry
The ``messageExtractor`` defines application specific methods to extract the entity
identifier and the shard identifier from incoming messages.
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-extractor
This example illustrates two different ways to define the entry identifier in the messages:
This example illustrates two different ways to define the entity identifier in the messages:
* The ``Get`` message includes the identifier itself.
* The ``EntryEnvelope`` holds the identifier, and the actual message that is
sent to the entry actor is wrapped in the envelope.
* The ``EntityEnvelope`` holds the identifier, and the actual message that is
sent to the entity actor is wrapped in the envelope.
Note how these two messages types are handled in the ``entryId`` and ``entryMessage`` methods shown above.
The message sent to the entry actor is what ``entryMessage`` returns and that makes it possible to unwrap envelopes
Note how these two messages types are handled in the ``entityId`` and ``entityMessage`` methods shown above.
The message sent to the entity actor is what ``entityMessage`` returns and that makes it possible to unwrap envelopes
if needed.
A shard is a group of entries that will be managed together. The grouping is defined by the
``shardResolver`` function shown above. For a specific entry identifier the shard identifier must always
be the same. Otherwise the entry actor might accidentally be started in several places at the same time.
A shard is a group of entities that will be managed together. The grouping is defined by the
``extractShardId`` function shown above. For a specific entity identifier the shard identifier must always
be the same. Otherwise the entity actor might accidentally be started in several places at the same time.
Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution,
i.e. same amount of entries in each shard. As a rule of thumb, the number of shards should be a factor ten greater
i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater
than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes
will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing
overhead, and increased latency because the coordinator is involved in the routing of the first message for each
shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping
all nodes in the cluster.
A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entry identifier modulo
number of shards.
A simple sharding algorithm that works fine in most cases is to take the absolute value of the ``hashCode`` of
the entity identifier modulo number of shards. As a convenience this is provided by the
``ShardRegion.HashCodeMessageExtractor``.
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
named entry type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
first message for a specific entry is delivered.
Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
The ``ShardRegion`` will lookup the location of the shard for the entity if it does not already know its location. It will
delegate the message to the right node and it will create the entity actor on demand, i.e. when the
first message for a specific entity is delivered.
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-usage
An Example in Scala
-------------------
This is how an entry actor may look like:
This is how an entity actor may look like:
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-actor
The above actor uses event sourcing and the support provided in ``PersistentActor`` to store its state.
It does not have to be a persistent actor, but in case of failure or migration of entries between nodes it must be able to recover
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
its state if it is valuable.
Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node
in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start``
in the cluster, supposed to register the supported entity types with the ``ClusterSharding.start``
method. ``ClusterSharding.start`` gives you the reference which you can pass along.
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-start
The ``idExtractor`` and ``shardResolver`` are two application specific functions to extract the entry
The ``extractEntityId`` and ``extractShardId`` are two application specific functions to extract the entity
identifier and the shard identifier from incoming messages.
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-extractor
This example illustrates two different ways to define the entry identifier in the messages:
This example illustrates two different ways to define the entity identifier in the messages:
* The ``Get`` message includes the identifier itself.
* The ``EntryEnvelope`` holds the identifier, and the actual message that is
sent to the entry actor is wrapped in the envelope.
* The ``EntityEnvelope`` holds the identifier, and the actual message that is
sent to the entity actor is wrapped in the envelope.
Note how these two messages types are handled in the ``idExtractor`` function shown above.
The message sent to the entry actor is the second part of the tuple return by the ``idExtractor`` and that makes it
Note how these two messages types are handled in the ``extractEntityId`` function shown above.
The message sent to the entity actor is the second part of the tuple return by the ``extractEntityId`` and that makes it
possible to unwrap envelopes if needed.
A shard is a group of entries that will be managed together. The grouping is defined by the
``shardResolver`` function shown above. For a specific entry identifier the shard identifier must always
A shard is a group of entities that will be managed together. The grouping is defined by the
``extractShardId`` function shown above. For a specific entity identifier the shard identifier must always
be the same.
Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution,
i.e. same amount of entries in each shard. As a rule of thumb, the number of shards should be a factor ten greater
i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater
than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes
will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing
overhead, and increased latency because the coordinator is involved in the routing of the first message for each
shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping
all nodes in the cluster.
A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entry identifier modulo
A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entity identifier modulo
number of shards.
Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
named entry type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will
delegate the message to the right node and it will create the entry actor on demand, i.e. when the
first message for a specific entry is delivered.
Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
The ``ShardRegion`` will lookup the location of the shard for the entity if it does not already know its location. It will
delegate the message to the right node and it will create the entity actor on demand, i.e. when the
first message for a specific entity is delivered.
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-usage
@ -144,16 +145,16 @@ How it works
The ``ShardRegion`` actor is started on each node in the cluster, or group of nodes
tagged with a specific role. The ``ShardRegion`` is created with two application specific
functions to extract the entry identifier and the shard identifier from incoming messages.
A shard is a group of entries that will be managed together. For the first message in a
functions to extract the entity identifier and the shard identifier from incoming messages.
A shard is a group of entities that will be managed together. For the first message in a
specific shard the ``ShardRegion`` request the location of the shard from a central coordinator,
the ``ShardCoordinator``.
The ``ShardCoordinator`` decides which ``ShardRegion`` shall own the ``Shard`` and informs
that ``ShardRegion``. The region will confirm this request and create the ``Shard`` supervisor
as a child actor. The individual ``Entries`` will then be created when needed by the ``Shard``
as a child actor. The individual ``Entities`` will then be created when needed by the ``Shard``
actor. Incoming messages thus travel via the ``ShardRegion`` and the ``Shard`` to the target
``Entry``.
``Entity``.
If the shard home is another ``ShardRegion`` instance messages will be forwarded
to that ``ShardRegion`` instance instead. While resolving the location of a
@ -166,8 +167,8 @@ Scenario 1:
#. Incoming message M1 to ``ShardRegion`` instance R1.
#. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1.
#. C answers that the home of S1 is R1.
#. R1 creates child actor for the entry E1 and sends buffered messages for S1 to E1 child
#. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entry children as needed, and forwards messages to them.
#. R1 creates child actor for the entity E1 and sends buffered messages for S1 to E1 child
#. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entity children as needed, and forwards messages to them.
Scenario 2:
@ -178,7 +179,7 @@ Scenario 2:
#. All incoming messages for S2 which arrive at R1 can be handled by R1 without C. It forwards messages to R2.
#. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2).
To make sure that at most one instance of a specific entry actor is running somewhere
To make sure that at most one instance of a specific entity actor is running somewhere
in the cluster it is important that all nodes have the same view of where the shards
are located. Therefore the shard allocation decisions are taken by the central
``ShardCoordinator``, which is running as a cluster singleton, i.e. one instance on
@ -191,19 +192,19 @@ allocates new shards to the ``ShardRegion`` with least number of previously allo
This strategy can be replaced by an application specific implementation.
To be able to use newly added members in the cluster the coordinator facilitates rebalancing
of shards, i.e. migrate entries from one node to another. In the rebalance process the
of shards, i.e. migrate entities from one node to another. In the rebalance process the
coordinator first notifies all ``ShardRegion`` actors that a handoff for a shard has started.
That means they will start buffering incoming messages for that shard, in the same way as if the
shard location is unknown. During the rebalance process the coordinator will not answer any
requests for the location of shards that are being rebalanced, i.e. local buffering will
continue until the handoff is completed. The ``ShardRegion`` responsible for the rebalanced shard
will stop all entries in that shard by sending the specified ``handOffStopMessage``
(default ``PoisonPill``) to them. When all entries have been terminated the ``ShardRegion``
owning the entries will acknowledge the handoff as completed to the coordinator.
will stop all entities in that shard by sending the specified ``handOffStopMessage``
(default ``PoisonPill``) to them. When all entities have been terminated the ``ShardRegion``
owning the entities will acknowledge the handoff as completed to the coordinator.
Thereafter the coordinator will reply to requests for the location of
the shard and thereby allocate a new home for the shard and then buffered messages in the
``ShardRegion`` actors are delivered to the new location. This means that the state of the entries
are not transferred or migrated. If the state of the entries are of importance it should be
``ShardRegion`` actors are delivered to the new location. This means that the state of the entities
are not transferred or migrated. If the state of the entities are of importance it should be
persistent (durable), e.g. with ``akka-persistence``, so that it can be recovered at the new
location.
@ -223,7 +224,7 @@ actor will take over and the state is recovered. During such a failure period sh
with known location are still available, while messages for new (unknown) shards
are buffered until the new ``ShardCoordinator`` becomes available.
As long as a sender uses the same ``ShardRegion`` actor to deliver messages to an entry
As long as a sender uses the same ``ShardRegion`` actor to deliver messages to an entity
actor the order of the messages is preserved. As long as the buffer limit is not reached
messages are delivered on a best effort basis, with at-most once delivery semantics,
in the same way as ordinary message sending. Reliable end-to-end messaging, with
@ -238,41 +239,42 @@ Proxy Only Mode
---------------
The ``ShardRegion`` actor can also be started in proxy only mode, i.e. it will not
host any entries itself, but knows how to delegate messages to the right location.
A ``ShardRegion`` starts in proxy only mode if the roles of the node does not include
the node role specified in ``akka.contrib.cluster.sharding.role`` config property
or if the specified `entryProps` is ``None`` / ``null``.
host any entities itself, but knows how to delegate messages to the right location.
A ``ShardRegion`` is started in proxy only mode with the method ``ClusterSharding.startProxy``
method.
Passivation
-----------
If the state of the entries are persistent you may stop entries that are not used to
If the state of the entities are persistent you may stop entities that are not used to
reduce memory consumption. This is done by the application specific implementation of
the entry actors for example by defining receive timeout (``context.setReceiveTimeout``).
If a message is already enqueued to the entry when it stops itself the enqueued message
the entity actors for example by defining receive timeout (``context.setReceiveTimeout``).
If a message is already enqueued to the entity when it stops itself the enqueued message
in the mailbox will be dropped. To support graceful passivation without loosing such
messages the entry actor can send ``ShardRegion.Passivate`` to its parent ``Shard``.
The specified wrapped message in ``Passivate`` will be sent back to the entry, which is
messages the entity actor can send ``ShardRegion.Passivate`` to its parent ``Shard``.
The specified wrapped message in ``Passivate`` will be sent back to the entity, which is
then supposed to stop itself. Incoming messages will be buffered by the ``Shard``
between reception of ``Passivate`` and termination of the entry. Such buffered messages
are thereafter delivered to a new incarnation of the entry.
between reception of ``Passivate`` and termination of the entity. Such buffered messages
are thereafter delivered to a new incarnation of the entity.
Remembering Entries
-------------------
Remembering Entities
--------------------
The list of entries in each ``Shard`` can be made persistent (durable) by setting
the ``rememberEntries`` flag to true when calling ``ClusterSharding.start``. When configured
to remember entries, whenever a ``Shard`` is rebalanced onto another node or recovers after a
crash it will recreate all the entries which were previously running in that ``Shard``. To
permanently stop entries, a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the
entry will be automatically restarted after the entry restart backoff specified in the configuration.
The list of entities in each ``Shard`` can be made persistent (durable) by setting
the ``rememberEntities`` flag to true in ``ClusterShardingSettings`` when calling
``ClusterSharding.start``. When configured to remember entities, whenever a ``Shard``
is rebalanced onto another node or recovers after a crash it will recreate all the
entities which were previously running in that ``Shard``. To permanently stop entities,
a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the
entity will be automatically restarted after the entity restart backoff specified in
the configuration.
When ``rememberEntries`` is set to false, a ``Shard`` will not automatically restart any entries
after a rebalance or recovering from a crash. Entries will only be started once the first message
for that entry has been received in the ``Shard``. Entries will not be restarted if they stop without
When ``rememberEntities`` is set to false, a ``Shard`` will not automatically restart any entities
after a rebalance or recovering from a crash. Entities will only be started once the first message
for that entity has been received in the ``Shard``. Entities will not be restarted if they stop without
using a ``Passivate``.
Note that the state of the entries themselves will not be restored unless they have been made persistent,
Note that the state of the entities themselves will not be restored unless they have been made persistent,
e.g. with ``akka-persistence``.
Graceful Shutdown