2015-06-07 14:49:38 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.cluster.sharding
|
|
|
|
|
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
import scala.concurrent.duration.FiniteDuration
|
|
|
|
|
import akka.actor.ActorSystem
|
|
|
|
|
import akka.actor.NoSerializationVerificationNeeded
|
|
|
|
|
import com.typesafe.config.Config
|
2015-06-07 15:34:21 +02:00
|
|
|
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
2015-06-07 14:49:38 +02:00
|
|
|
|
|
|
|
|
object ClusterShardingSettings {
|
|
|
|
|
/**
|
|
|
|
|
* Create settings from the default configuration
|
|
|
|
|
* `akka.cluster.sharding`.
|
|
|
|
|
*/
|
|
|
|
|
def apply(system: ActorSystem): ClusterShardingSettings =
|
|
|
|
|
apply(system.settings.config.getConfig("akka.cluster.sharding"))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Create settings from a configuration with the same layout as
|
|
|
|
|
* the default configuration `akka.cluster.sharding`.
|
|
|
|
|
*/
|
|
|
|
|
def apply(config: Config): ClusterShardingSettings = {
|
|
|
|
|
val tuningParameters = new TuningParameters(
|
|
|
|
|
coordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis,
|
|
|
|
|
retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis,
|
|
|
|
|
bufferSize = config.getInt("buffer-size"),
|
|
|
|
|
handOffTimeout = config.getDuration("handoff-timeout", MILLISECONDS).millis,
|
|
|
|
|
shardStartTimeout = config.getDuration("shard-start-timeout", MILLISECONDS).millis,
|
|
|
|
|
shardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis,
|
2015-06-09 12:25:58 +02:00
|
|
|
entityRestartBackoff = config.getDuration("entity-restart-backoff", MILLISECONDS).millis,
|
2015-06-07 14:49:38 +02:00
|
|
|
rebalanceInterval = config.getDuration("rebalance-interval", MILLISECONDS).millis,
|
2015-06-08 11:57:12 +02:00
|
|
|
snapshotAfter = config.getInt("snapshot-after"),
|
2015-06-07 14:49:38 +02:00
|
|
|
leastShardAllocationRebalanceThreshold =
|
|
|
|
|
config.getInt("least-shard-allocation-strategy.rebalance-threshold"),
|
|
|
|
|
leastShardAllocationMaxSimultaneousRebalance =
|
|
|
|
|
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"))
|
|
|
|
|
|
2015-06-07 15:34:21 +02:00
|
|
|
val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton"))
|
|
|
|
|
|
2015-06-07 14:49:38 +02:00
|
|
|
new ClusterShardingSettings(
|
|
|
|
|
role = roleOption(config.getString("role")),
|
2015-06-09 12:25:58 +02:00
|
|
|
rememberEntities = config.getBoolean("remember-entities"),
|
2015-06-07 14:49:38 +02:00
|
|
|
journalPluginId = config.getString("journal-plugin-id"),
|
|
|
|
|
snapshotPluginId = config.getString("snapshot-plugin-id"),
|
2015-06-07 15:34:21 +02:00
|
|
|
tuningParameters,
|
|
|
|
|
coordinatorSingletonSettings)
|
2015-06-07 14:49:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Create settings from the default configuration
|
|
|
|
|
* `akka.cluster.sharding`.
|
|
|
|
|
*/
|
|
|
|
|
def create(system: ActorSystem): ClusterShardingSettings = apply(system)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Create settings from a configuration with the same layout as
|
|
|
|
|
* the default configuration `akka.cluster.sharding`.
|
|
|
|
|
*/
|
|
|
|
|
def create(config: Config): ClusterShardingSettings = apply(config)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] def roleOption(role: String): Option[String] =
|
|
|
|
|
if (role == "") None else Option(role)
|
|
|
|
|
|
|
|
|
|
class TuningParameters(
|
|
|
|
|
val coordinatorFailureBackoff: FiniteDuration,
|
|
|
|
|
val retryInterval: FiniteDuration,
|
|
|
|
|
val bufferSize: Int,
|
|
|
|
|
val handOffTimeout: FiniteDuration,
|
|
|
|
|
val shardStartTimeout: FiniteDuration,
|
|
|
|
|
val shardFailureBackoff: FiniteDuration,
|
2015-06-09 12:25:58 +02:00
|
|
|
val entityRestartBackoff: FiniteDuration,
|
2015-06-07 14:49:38 +02:00
|
|
|
val rebalanceInterval: FiniteDuration,
|
2015-06-08 11:57:12 +02:00
|
|
|
val snapshotAfter: Int,
|
2015-06-07 14:49:38 +02:00
|
|
|
val leastShardAllocationRebalanceThreshold: Int,
|
|
|
|
|
val leastShardAllocationMaxSimultaneousRebalance: Int)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-06-09 12:25:58 +02:00
|
|
|
* @param role specifies that this entity type requires cluster nodes with a specific role.
|
2015-06-07 14:49:38 +02:00
|
|
|
* If the role is not specified all nodes in the cluster are used.
|
2015-06-09 12:25:58 +02:00
|
|
|
* @param rememberEntities true if active entity actors shall be automatically restarted upon `Shard`
|
2015-06-07 14:49:38 +02:00
|
|
|
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
|
2015-06-09 12:25:58 +02:00
|
|
|
* @param journalPluginId Absolute path to the journal plugin configuration entity that is to
|
2015-06-07 14:49:38 +02:00
|
|
|
* be used for the internal persistence of ClusterSharding. If not defined the default
|
2015-06-09 12:25:58 +02:00
|
|
|
* journal plugin is used. Note that this is not related to persistence used by the entity
|
2015-06-07 14:49:38 +02:00
|
|
|
* actors.
|
2015-06-09 12:25:58 +02:00
|
|
|
* @param snapshotPluginId Absolute path to the snapshot plugin configuration entity that is to
|
2015-06-07 14:49:38 +02:00
|
|
|
* be used for the internal persistence of ClusterSharding. If not defined the default
|
2015-06-09 12:25:58 +02:00
|
|
|
* snapshot plugin is used. Note that this is not related to persistence used by the entity
|
2015-06-07 14:49:38 +02:00
|
|
|
* actors.
|
|
|
|
|
* @param tuningParameters additional tuning parameters, see descriptions in reference.conf
|
|
|
|
|
*/
|
|
|
|
|
final class ClusterShardingSettings(
|
|
|
|
|
val role: Option[String],
|
2015-06-09 12:25:58 +02:00
|
|
|
val rememberEntities: Boolean,
|
2015-06-07 14:49:38 +02:00
|
|
|
val journalPluginId: String,
|
|
|
|
|
val snapshotPluginId: String,
|
2015-06-07 15:34:21 +02:00
|
|
|
val tuningParameters: ClusterShardingSettings.TuningParameters,
|
|
|
|
|
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {
|
2015-06-07 14:49:38 +02:00
|
|
|
|
|
|
|
|
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))
|
|
|
|
|
|
|
|
|
|
def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role)
|
|
|
|
|
|
2015-06-09 12:25:58 +02:00
|
|
|
def withRememberEntities(rememberEntities: Boolean): ClusterShardingSettings =
|
|
|
|
|
copy(rememberEntities = rememberEntities)
|
2015-06-07 14:49:38 +02:00
|
|
|
|
|
|
|
|
def withJournalPluginId(journalPluginId: String): ClusterShardingSettings =
|
|
|
|
|
copy(journalPluginId = journalPluginId)
|
|
|
|
|
|
|
|
|
|
def withSnapshotPluginId(snapshotPluginId: String): ClusterShardingSettings =
|
|
|
|
|
copy(snapshotPluginId = snapshotPluginId)
|
|
|
|
|
|
|
|
|
|
def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings =
|
|
|
|
|
copy(tuningParameters = tuningParameters)
|
|
|
|
|
|
2015-06-07 15:34:21 +02:00
|
|
|
def withCoordinatorSingletonSettings(coordinatorSingletonSettings: ClusterSingletonManagerSettings): ClusterShardingSettings =
|
|
|
|
|
copy(coordinatorSingletonSettings = coordinatorSingletonSettings)
|
|
|
|
|
|
2015-06-07 14:49:38 +02:00
|
|
|
private def copy(role: Option[String] = role,
|
2015-06-09 12:25:58 +02:00
|
|
|
rememberEntities: Boolean = rememberEntities,
|
2015-06-07 14:49:38 +02:00
|
|
|
journalPluginId: String = journalPluginId,
|
|
|
|
|
snapshotPluginId: String = snapshotPluginId,
|
2015-06-07 15:34:21 +02:00
|
|
|
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
|
|
|
|
|
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings =
|
2015-06-07 14:49:38 +02:00
|
|
|
new ClusterShardingSettings(
|
|
|
|
|
role,
|
2015-06-09 12:25:58 +02:00
|
|
|
rememberEntities,
|
2015-06-07 14:49:38 +02:00
|
|
|
journalPluginId,
|
|
|
|
|
snapshotPluginId,
|
2015-06-07 15:34:21 +02:00
|
|
|
tuningParameters,
|
|
|
|
|
coordinatorSingletonSettings)
|
2015-06-07 14:49:38 +02:00
|
|
|
}
|