ClusterSharding: automatically choose start or startProxy by a node role (#23934) (#24669)

This commit is contained in:
Patrik Nordwall 2018-03-07 05:17:56 +01:00 committed by Konrad `ktoso` Malawski
parent d55990fa64
commit a13f5cab00
7 changed files with 114 additions and 38 deletions

View file

@ -247,8 +247,8 @@ final class ClusterShardingSettings(
*/
@InternalApi
private[akka] def shouldHostShard(cluster: Cluster): Boolean =
(role.isEmpty || cluster.selfMember.roles(role.get)) &&
(dataCenter.isEmpty || cluster.selfMember.dataCenter.contains(dataCenter.get))
role.forall(cluster.selfMember.roles.contains) &&
dataCenter.forall(cluster.selfMember.dataCenter.contains)
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.option(role))

View file

@ -0,0 +1,2 @@
# #23952 automatically choose startProxy in ClusterSharding
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ClusterSharding.requireClusterRole")

View file

@ -7,7 +7,10 @@ import java.net.URLEncoder
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.concurrent.Await
import scala.util.control.NonFatal
import akka.actor.Actor
import akka.actor.ActorRef
@ -20,23 +23,19 @@ import akka.actor.ExtensionIdProvider
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.singleton.ClusterSingletonManager
import akka.pattern.BackoffSupervisor
import akka.util.ByteString
import akka.pattern.ask
import akka.dispatch.Dispatchers
import akka.cluster.ddata.ReplicatorSettings
import akka.cluster.ddata.Replicator
import scala.util.control.NonFatal
import akka.actor.Status
import akka.annotation.InternalApi
import akka.cluster.Cluster
import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter
import scala.collection.immutable
import scala.collection.JavaConverters._
import akka.annotation.InternalApi
import akka.cluster.ddata.Replicator
import akka.cluster.ddata.ReplicatorSettings
import akka.cluster.singleton.ClusterSingletonManager
import akka.dispatch.Dispatchers
import akka.event.Logging
import akka.pattern.BackoffSupervisor
import akka.pattern.ask
import akka.util.ByteString
/**
* This extension provides sharding functionality of actors in a cluster.
@ -158,8 +157,10 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv
*/
class ClusterSharding(system: ExtendedActorSystem) extends Extension {
import ClusterShardingGuardian._
import ShardCoordinator.ShardAllocationStrategy
import ShardCoordinator.LeastShardAllocationStrategy
import ShardCoordinator.ShardAllocationStrategy
private val log = Logging(system, this.getClass)
private val cluster = Cluster(system)
@ -177,16 +178,14 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
system.systemActorOf(Props[ClusterShardingGuardian].withDispatcher(dispatcher), guardianName)
}
private[akka] def requireClusterRole(role: Option[String]): Unit =
require(
role.forall(cluster.selfRoles.contains),
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
/**
* Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor
* and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
* for this type can later be retrieved with the [[shardRegion]] method.
*
* This method will start a [[ShardRegion]] in proxy mode in case if there is no match between the roles of
* the current cluster node and the role specified in [[ClusterShardingSettings]] passed to this method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
@ -228,13 +227,24 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
requireClusterRole(settings.role)
if (settings.shouldHostShard(cluster)) {
implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entityProps, settings,
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion)
shardRegion
} else {
log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...", typeName)
startProxy(
typeName,
settings.role,
dataCenter = None, // startProxy method must be used directly to start a proxy for another DC
extractEntityId,
extractShardId)
}
}
/**
@ -245,6 +255,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
* is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
*
* This method will start a [[ShardRegion]] in proxy mode in case if there is no match between the
* node roles and the role specified in the [[ClusterShardingSettings]] passed to this method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
@ -265,9 +278,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef = {
val allocationStrategy = new LeastShardAllocationStrategy(
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
val allocationStrategy = defaultShardAllocationStrategy(settings)
start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
}
@ -277,6 +288,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
* for this type can later be retrieved with the [[#shardRegion]] method.
*
* This method will start a [[ShardRegion]] in proxy mode in case if there is no match between the
* node roles and the role specified in the [[ClusterShardingSettings]] passed to this method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
@ -321,6 +335,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
* is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
*
* This method will start a [[ShardRegion]] in proxy mode in case if there is no match between the
* node roles and the role specified in the [[ClusterShardingSettings]] passed to this method.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
*
@ -337,9 +354,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
settings: ClusterShardingSettings,
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
val allocationStrategy = new LeastShardAllocationStrategy(
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
val allocationStrategy = defaultShardAllocationStrategy(settings)
start(typeName, entityProps, settings, messageExtractor, allocationStrategy, PoisonPill)
}
@ -517,6 +532,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
}
}
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance
new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
}
}
/**

View file

@ -7,7 +7,9 @@ import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.InternalApi
import com.typesafe.config.Config
import akka.cluster.Cluster
import akka.cluster.singleton.ClusterSingletonManagerSettings
object ClusterShardingSettings {
@ -204,6 +206,11 @@ final class ClusterShardingSettings(
stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData,
s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'")
/** If true, this node should run the shard region, otherwise just a shard proxy should started on this node. */
@InternalApi
private[akka] def shouldHostShard(cluster: Cluster): Boolean =
role.forall(cluster.selfMember.roles.contains)
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))
def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role)

View file

@ -111,7 +111,7 @@ object ShardRegion {
*/
def entityMessage(message: Any): Any
/**
* Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]]
* Extract the shard id from an incoming `message`. Only messages that passed the [[#entityId]]
* function will be used as input to this function.
*/
def shardId(message: Any): String

View file

@ -0,0 +1,42 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.{ ExtendedActorSystem, PoisonPill, Props }
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.testkit.AkkaSpec
import org.mockito.ArgumentMatchers
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar
class ClusterShardingInternalsSpec extends AkkaSpec("""akka.actor.provider = "cluster"""") with MockitoSugar {
val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem]))
"ClusterSharding" must {
"start a region in proxy mode in case of node role mismatch" in {
val settingsWithRole = ClusterShardingSettings(system).withRole("nonExistingRole")
val typeName = "typeName"
val extractEntityId = mock[ShardRegion.ExtractEntityId]
val extractShardId = mock[ShardRegion.ExtractShardId]
clusterSharding.start(
typeName = typeName,
entityProps = Props.empty,
settings = settingsWithRole,
extractEntityId = extractEntityId,
extractShardId = extractShardId,
allocationStrategy = mock[ShardAllocationStrategy],
handOffStopMessage = PoisonPill)
verify(clusterSharding).startProxy(
ArgumentMatchers.eq(typeName),
ArgumentMatchers.eq(settingsWithRole.role),
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(extractEntityId),
ArgumentMatchers.eq(extractShardId))
}
}
}

View file

@ -61,6 +61,9 @@ You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node
in the cluster, supposed to register the supported entity types with the `ClusterSharding.start`
method. `ClusterSharding.start` gives you the reference which you can pass along.
Please note that `ClusterSharding.start` will start a `ShardRegion` in [proxy only mode](#proxy-only-mode)
in case if there is no match between the roles of the current cluster node and the role specified in
`ClusterShardingSettings`.
Scala
: @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #counter-start }
@ -279,8 +282,10 @@ See @ref:[How To Startup when Cluster Size Reached](cluster-usage.md#min-members
The `ShardRegion` actor can also be started in proxy only mode, i.e. it will not
host any entities itself, but knows how to delegate messages to the right location.
A `ShardRegion` is started in proxy only mode with the method `ClusterSharding.startProxy`
method.
A `ShardRegion` is started in proxy only mode with the `ClusterSharding.startProxy` method.
Also a `ShardRegion` is started in proxy only mode in case if there is no match between the
roles of the current cluster node and the role specified in `ClusterShardingSettings`
passed to the `ClusterSharding.start` method.
## Passivation