From 294659e2fe394782ca32fbffcc42953d932340eb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 7 Jun 2015 15:34:21 +0200 Subject: [PATCH] =cls #15330 Enable configuration of coordinator singleton --- .../src/main/resources/reference.conf | 3 +++ .../cluster/sharding/ClusterSharding.scala | 2 +- .../sharding/ClusterShardingSettings.scala | 19 ++++++++++++++----- .../sharding/ClusterShardingLeavingSpec.scala | 2 +- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 0216c56102..0f94a69e3e 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -78,5 +78,8 @@ akka.cluster.sharding { # The number of ongoing rebalancing processes is limited to this number. max-simultaneous-rebalance = 3 } + + # Settings for the coordinator singleton. Same layout as akka.cluster.singleton. + coordinator-singleton = ${akka.cluster.singleton} } # //#sharding-ext-config diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 90d8a5341d..688c228ab0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -450,7 +450,7 @@ private[akka] class ClusterShardingGuardian extends Actor { if (context.child(cName).isEmpty) { val coordinatorProps = ShardCoordinator.props(settings, allocationStrategy) val singletonProps = ShardCoordinatorSupervisor.props(coordinatorFailureBackoff, coordinatorProps) - val singletonSettings = ClusterSingletonManagerSettings(context.system) + val singletonSettings = settings.coordinatorSingletonSettings .withSingletonName("singleton").withRole(role) context.actorOf(ClusterSingletonManager.props( singletonProps, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 1601386578..48aeec9c91 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -5,10 +5,10 @@ package akka.cluster.sharding import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration - import akka.actor.ActorSystem import akka.actor.NoSerializationVerificationNeeded import com.typesafe.config.Config +import akka.cluster.singleton.ClusterSingletonManagerSettings object ClusterShardingSettings { /** @@ -38,12 +38,15 @@ object ClusterShardingSettings { leastShardAllocationMaxSimultaneousRebalance = config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance")) + val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton")) + new ClusterShardingSettings( role = roleOption(config.getString("role")), rememberEntries = config.getBoolean("remember-entries"), journalPluginId = config.getString("journal-plugin-id"), snapshotPluginId = config.getString("snapshot-plugin-id"), - tuningParameters) + tuningParameters, + coordinatorSingletonSettings) } /** @@ -98,7 +101,8 @@ final class ClusterShardingSettings( val rememberEntries: Boolean, val journalPluginId: String, val snapshotPluginId: String, - val tuningParameters: ClusterShardingSettings.TuningParameters) extends NoSerializationVerificationNeeded { + val tuningParameters: ClusterShardingSettings.TuningParameters, + val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded { def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role)) @@ -116,15 +120,20 @@ final class ClusterShardingSettings( def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings = copy(tuningParameters = tuningParameters) + def withCoordinatorSingletonSettings(coordinatorSingletonSettings: ClusterSingletonManagerSettings): ClusterShardingSettings = + copy(coordinatorSingletonSettings = coordinatorSingletonSettings) + private def copy(role: Option[String] = role, rememberEntries: Boolean = rememberEntries, journalPluginId: String = journalPluginId, snapshotPluginId: String = snapshotPluginId, - tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters): ClusterShardingSettings = + tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, + coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings = new ClusterShardingSettings( role, rememberEntries, journalPluginId, snapshotPluginId, - tuningParameters) + tuningParameters, + coordinatorSingletonSettings) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index 4376d3a069..fdb5f69be6 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -33,7 +33,7 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(ConfigFactory.parseString(""" - akka.loglevel = DEBUG + akka.loglevel = INFO akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s