diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index a05cb244a1..c73df15558 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -247,8 +247,8 @@ final class ClusterShardingSettings( */ @InternalApi private[akka] def shouldHostShard(cluster: Cluster): Boolean = - (role.isEmpty || cluster.selfMember.roles(role.get)) && - (dataCenter.isEmpty || cluster.selfMember.dataCenter.contains(dataCenter.get)) + role.forall(cluster.selfMember.roles.contains) && + dataCenter.forall(cluster.selfMember.dataCenter.contains) def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.option(role)) diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.11.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.11.backwards.excludes new file mode 100644 index 0000000000..d04da5e924 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.11.backwards.excludes @@ -0,0 +1,2 @@ +# #23952 automatically choose startProxy in ClusterSharding +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ClusterSharding.requireClusterRole") \ No newline at end of file diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 05f1168dc2..41db7e398d 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -7,7 +7,10 @@ import java.net.URLEncoder import java.util.Optional import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ +import scala.collection.immutable import scala.concurrent.Await +import scala.util.control.NonFatal import akka.actor.Actor import akka.actor.ActorRef @@ -20,23 +23,19 @@ import akka.actor.ExtensionIdProvider import akka.actor.NoSerializationVerificationNeeded import akka.actor.PoisonPill import akka.actor.Props -import akka.cluster.Cluster -import akka.cluster.singleton.ClusterSingletonManager -import akka.pattern.BackoffSupervisor -import akka.util.ByteString -import akka.pattern.ask -import akka.dispatch.Dispatchers -import akka.cluster.ddata.ReplicatorSettings -import akka.cluster.ddata.Replicator -import scala.util.control.NonFatal - import akka.actor.Status +import akka.annotation.InternalApi +import akka.cluster.Cluster import akka.cluster.ClusterSettings import akka.cluster.ClusterSettings.DataCenter -import scala.collection.immutable -import scala.collection.JavaConverters._ - -import akka.annotation.InternalApi +import akka.cluster.ddata.Replicator +import akka.cluster.ddata.ReplicatorSettings +import akka.cluster.singleton.ClusterSingletonManager +import akka.dispatch.Dispatchers +import akka.event.Logging +import akka.pattern.BackoffSupervisor +import akka.pattern.ask +import akka.util.ByteString /** * This extension provides sharding functionality of actors in a cluster. @@ -158,8 +157,10 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv */ class ClusterSharding(system: ExtendedActorSystem) extends Extension { import ClusterShardingGuardian._ - import ShardCoordinator.ShardAllocationStrategy import ShardCoordinator.LeastShardAllocationStrategy + import ShardCoordinator.ShardAllocationStrategy + + private val log = Logging(system, this.getClass) private val cluster = Cluster(system) @@ -177,16 +178,14 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { system.systemActorOf(Props[ClusterShardingGuardian].withDispatcher(dispatcher), guardianName) } - private[akka] def requireClusterRole(role: Option[String]): Unit = - require( - role.forall(cluster.selfRoles.contains), - s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") - /** * Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[shardRegion]] method. * + * This method will start a [[ShardRegion]] in proxy mode in case if there is no match between the roles of + * the current cluster node and the role specified in [[ClusterShardingSettings]] passed to this method. + * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * @@ -228,13 +227,24 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = { - requireClusterRole(settings.role) - implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entityProps, settings, - extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) - val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) - regions.put(typeName, shardRegion) - shardRegion + if (settings.shouldHostShard(cluster)) { + + implicit val timeout = system.settings.CreationTimeout + val startMsg = Start(typeName, entityProps, settings, + extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) + val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) + regions.put(typeName, shardRegion) + shardRegion + } else { + log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...", typeName) + + startProxy( + typeName, + settings.role, + dataCenter = None, // startProxy method must be used directly to start a proxy for another DC + extractEntityId, + extractShardId) + } } /** @@ -245,6 +255,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`. * + * This method will start a [[ShardRegion]] in proxy mode in case if there is no match between the + * node roles and the role specified in the [[ClusterShardingSettings]] passed to this method. + * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * @@ -265,9 +278,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId): ActorRef = { - val allocationStrategy = new LeastShardAllocationStrategy( - settings.tuningParameters.leastShardAllocationRebalanceThreshold, - settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) + val allocationStrategy = defaultShardAllocationStrategy(settings) start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill) } @@ -277,6 +288,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * + * This method will start a [[ShardRegion]] in proxy mode in case if there is no match between the + * node roles and the role specified in the [[ClusterShardingSettings]] passed to this method. + * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * @@ -321,6 +335,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`. * + * This method will start a [[ShardRegion]] in proxy mode in case if there is no match between the + * node roles and the role specified in the [[ClusterShardingSettings]] passed to this method. + * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * @@ -337,9 +354,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { settings: ClusterShardingSettings, messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - val allocationStrategy = new LeastShardAllocationStrategy( - settings.tuningParameters.leastShardAllocationRebalanceThreshold, - settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) + val allocationStrategy = defaultShardAllocationStrategy(settings) start(typeName, entityProps, settings, messageExtractor, allocationStrategy, PoisonPill) } @@ -517,6 +532,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } } + def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { + val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold + val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance + new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) + } } /** diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index e4fbce1f87..3bd91faf4d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -7,7 +7,9 @@ import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration import akka.actor.ActorSystem import akka.actor.NoSerializationVerificationNeeded +import akka.annotation.InternalApi import com.typesafe.config.Config +import akka.cluster.Cluster import akka.cluster.singleton.ClusterSingletonManagerSettings object ClusterShardingSettings { @@ -204,6 +206,11 @@ final class ClusterShardingSettings( stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData, s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'") + /** If true, this node should run the shard region, otherwise just a shard proxy should started on this node. */ + @InternalApi + private[akka] def shouldHostShard(cluster: Cluster): Boolean = + role.forall(cluster.selfMember.roles.contains) + def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role)) def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index fc134805d9..77a200663c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -111,7 +111,7 @@ object ShardRegion { */ def entityMessage(message: Any): Any /** - * Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]] + * Extract the shard id from an incoming `message`. Only messages that passed the [[#entityId]] * function will be used as input to this function. */ def shardId(message: Any): String diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala new file mode 100644 index 0000000000..a6cb01b0fb --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.cluster.sharding + +import akka.actor.{ ExtendedActorSystem, PoisonPill, Props } +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.testkit.AkkaSpec +import org.mockito.ArgumentMatchers +import org.mockito.Mockito._ +import org.scalatest.mockito.MockitoSugar + +class ClusterShardingInternalsSpec extends AkkaSpec("""akka.actor.provider = "cluster"""") with MockitoSugar { + + val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem])) + + "ClusterSharding" must { + "start a region in proxy mode in case of node role mismatch" in { + + val settingsWithRole = ClusterShardingSettings(system).withRole("nonExistingRole") + val typeName = "typeName" + val extractEntityId = mock[ShardRegion.ExtractEntityId] + val extractShardId = mock[ShardRegion.ExtractShardId] + + clusterSharding.start( + typeName = typeName, + entityProps = Props.empty, + settings = settingsWithRole, + extractEntityId = extractEntityId, + extractShardId = extractShardId, + allocationStrategy = mock[ShardAllocationStrategy], + handOffStopMessage = PoisonPill) + + verify(clusterSharding).startProxy( + ArgumentMatchers.eq(typeName), + ArgumentMatchers.eq(settingsWithRole.role), + ArgumentMatchers.eq(None), + ArgumentMatchers.eq(extractEntityId), + ArgumentMatchers.eq(extractShardId)) + } + } +} diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index b5e79f5508..f6a76fcea6 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -61,6 +61,9 @@ You may define it another way, but it must be unique. When using the sharding extension you are first, typically at system startup on each node in the cluster, supposed to register the supported entity types with the `ClusterSharding.start` method. `ClusterSharding.start` gives you the reference which you can pass along. +Please note that `ClusterSharding.start` will start a `ShardRegion` in [proxy only mode](#proxy-only-mode) +in case if there is no match between the roles of the current cluster node and the role specified in +`ClusterShardingSettings`. Scala : @@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #counter-start } @@ -279,8 +282,10 @@ See @ref:[How To Startup when Cluster Size Reached](cluster-usage.md#min-members The `ShardRegion` actor can also be started in proxy only mode, i.e. it will not host any entities itself, but knows how to delegate messages to the right location. -A `ShardRegion` is started in proxy only mode with the method `ClusterSharding.startProxy` -method. +A `ShardRegion` is started in proxy only mode with the `ClusterSharding.startProxy` method. +Also a `ShardRegion` is started in proxy only mode in case if there is no match between the +roles of the current cluster node and the role specified in `ClusterShardingSettings` +passed to the `ClusterSharding.start` method. ## Passivation