=cls #15330 Enable configuration of coordinator singleton
This commit is contained in:
parent
8de24f38ca
commit
294659e2fe
4 changed files with 19 additions and 7 deletions
|
|
@ -78,5 +78,8 @@ akka.cluster.sharding {
|
|||
# The number of ongoing rebalancing processes is limited to this number.
|
||||
max-simultaneous-rebalance = 3
|
||||
}
|
||||
|
||||
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
|
||||
coordinator-singleton = ${akka.cluster.singleton}
|
||||
}
|
||||
# //#sharding-ext-config
|
||||
|
|
|
|||
|
|
@ -450,7 +450,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
|||
if (context.child(cName).isEmpty) {
|
||||
val coordinatorProps = ShardCoordinator.props(settings, allocationStrategy)
|
||||
val singletonProps = ShardCoordinatorSupervisor.props(coordinatorFailureBackoff, coordinatorProps)
|
||||
val singletonSettings = ClusterSingletonManagerSettings(context.system)
|
||||
val singletonSettings = settings.coordinatorSingletonSettings
|
||||
.withSingletonName("singleton").withRole(role)
|
||||
context.actorOf(ClusterSingletonManager.props(
|
||||
singletonProps,
|
||||
|
|
|
|||
|
|
@ -5,10 +5,10 @@ package akka.cluster.sharding
|
|||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.NoSerializationVerificationNeeded
|
||||
import com.typesafe.config.Config
|
||||
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
||||
|
||||
object ClusterShardingSettings {
|
||||
/**
|
||||
|
|
@ -38,12 +38,15 @@ object ClusterShardingSettings {
|
|||
leastShardAllocationMaxSimultaneousRebalance =
|
||||
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"))
|
||||
|
||||
val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton"))
|
||||
|
||||
new ClusterShardingSettings(
|
||||
role = roleOption(config.getString("role")),
|
||||
rememberEntries = config.getBoolean("remember-entries"),
|
||||
journalPluginId = config.getString("journal-plugin-id"),
|
||||
snapshotPluginId = config.getString("snapshot-plugin-id"),
|
||||
tuningParameters)
|
||||
tuningParameters,
|
||||
coordinatorSingletonSettings)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -98,7 +101,8 @@ final class ClusterShardingSettings(
|
|||
val rememberEntries: Boolean,
|
||||
val journalPluginId: String,
|
||||
val snapshotPluginId: String,
|
||||
val tuningParameters: ClusterShardingSettings.TuningParameters) extends NoSerializationVerificationNeeded {
|
||||
val tuningParameters: ClusterShardingSettings.TuningParameters,
|
||||
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {
|
||||
|
||||
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))
|
||||
|
||||
|
|
@ -116,15 +120,20 @@ final class ClusterShardingSettings(
|
|||
def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings =
|
||||
copy(tuningParameters = tuningParameters)
|
||||
|
||||
def withCoordinatorSingletonSettings(coordinatorSingletonSettings: ClusterSingletonManagerSettings): ClusterShardingSettings =
|
||||
copy(coordinatorSingletonSettings = coordinatorSingletonSettings)
|
||||
|
||||
private def copy(role: Option[String] = role,
|
||||
rememberEntries: Boolean = rememberEntries,
|
||||
journalPluginId: String = journalPluginId,
|
||||
snapshotPluginId: String = snapshotPluginId,
|
||||
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters): ClusterShardingSettings =
|
||||
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
|
||||
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings =
|
||||
new ClusterShardingSettings(
|
||||
role,
|
||||
rememberEntries,
|
||||
journalPluginId,
|
||||
snapshotPluginId,
|
||||
tuningParameters)
|
||||
tuningParameters,
|
||||
coordinatorSingletonSettings)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig {
|
|||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.loglevel = DEBUG
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.auto-down-unreachable-after = 0s
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue