pekko/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala

951 lines
40 KiB
Scala
Raw Normal View History

/*
2021-01-08 17:55:38 +01:00
* Copyright (C) 2015-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.ActorSystem
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.{ ApiMayChange, InternalApi }
import akka.cluster.Cluster
import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.coordination.lease.LeaseUsageSettings
import akka.japi.Util.immutableSeq
import akka.util.Helpers.toRootLowerCase
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config
import scala.collection.immutable
import scala.concurrent.duration._
object ClusterShardingSettings {
val StateStoreModePersistence = "persistence"
val StateStoreModeDData = "ddata"
/**
2020-06-11 15:45:00 +02:00
* Only for testing
* INTERNAL API
*/
@InternalApi
private[akka] val RememberEntitiesStoreCustom = "custom"
/**
* INTERNAL API
*/
@InternalApi
private[akka] val RememberEntitiesStoreDData = "ddata"
/**
* INTERNAL API
*/
@InternalApi
private[akka] val RememberEntitiesStoreEventsourced = "eventsourced"
/**
* Create settings from the default configuration
* `akka.cluster.sharding`.
*/
def apply(system: ActorSystem): ClusterShardingSettings =
apply(system.settings.config.getConfig("akka.cluster.sharding"))
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.sharding`.
*/
def apply(config: Config): ClusterShardingSettings = {
def configMajorityPlus(p: String): Int = {
toRootLowerCase(config.getString(p)) match {
case "all" => Int.MaxValue
case _ => config.getInt(p)
}
}
val tuningParameters = new TuningParameters(
coordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis,
retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis,
bufferSize = config.getInt("buffer-size"),
handOffTimeout = config.getDuration("handoff-timeout", MILLISECONDS).millis,
shardStartTimeout = config.getDuration("shard-start-timeout", MILLISECONDS).millis,
shardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis,
entityRestartBackoff = config.getDuration("entity-restart-backoff", MILLISECONDS).millis,
rebalanceInterval = config.getDuration("rebalance-interval", MILLISECONDS).millis,
snapshotAfter = config.getInt("snapshot-after"),
#21725 cluster-sharding doesn't delete snapshots and messages (#21777) * #21725 cluster-sharding doesn't delete snapshots and messages Fixes #21725 Without deleting messages those pollute persistence with not needed anymore messages. Naive and bullet proof flow is snapshot -> delete messges -> delete snapshots. # Пожалуйста, введите сообщение коммита для ваших изменений. Строки, # начинающиеся с «#» будут оставлены; вы можете удалить их вручную, # если хотите. Пустое сообщение отменяет процесс коммита. # # Дата: Mon Oct 31 23:24:37 2016 +0300 # # интерактивное перемещение в процессе; над 432b53c # Последняя команда выполнена (1 команда выполнена): # edit f86b015 21725 cluster-sharding doesn't delete snapshots and messages Fixes #21725 Without deleting messages those pollute persistence with not needed anymore messages. Naive and bullet proof flow is snapshot -> delete messges -> delete snapshots. # Следующая команда для выполнения (1 команда осталась): # pick 56adb40 #21725 keeping N number of batches (messages and snapshot) using N from configuration # Вы сейчас редактируете коммит при перемещении ветки «fix-21725-delete-messages-after-snapshot» над «432b53c». # # Изменения, которые будут включены в коммит: # изменено: akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala # изменено: akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala # * #21725 keeping N number of batches (messages and snapshot) using N from configuration
2017-02-21 16:17:19 +03:00
keepNrOfBatches = config.getInt("keep-nr-of-batches"),
2019-03-11 10:38:24 +01:00
leastShardAllocationRebalanceThreshold = config.getInt("least-shard-allocation-strategy.rebalance-threshold"),
leastShardAllocationMaxSimultaneousRebalance =
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"),
waitingForStateTimeout = config.getDuration("waiting-for-state-timeout", MILLISECONDS).millis,
updatingStateTimeout = config.getDuration("updating-state-timeout", MILLISECONDS).millis,
entityRecoveryStrategy = config.getString("entity-recovery-strategy"),
2019-03-11 10:38:24 +01:00
entityRecoveryConstantRateStrategyFrequency =
config.getDuration("entity-recovery-constant-rate-strategy.frequency", MILLISECONDS).millis,
entityRecoveryConstantRateStrategyNumberOfEntities =
config.getInt("entity-recovery-constant-rate-strategy.number-of-entities"),
coordinatorStateWriteMajorityPlus = configMajorityPlus("coordinator-state.write-majority-plus"),
coordinatorStateReadMajorityPlus = configMajorityPlus("coordinator-state.read-majority-plus"),
leastShardAllocationAbsoluteLimit = config.getInt("least-shard-allocation-strategy.rebalance-absolute-limit"),
leastShardAllocationRelativeLimit = config.getDouble("least-shard-allocation-strategy.rebalance-relative-limit"))
val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton"))
val passivationStrategySettings = PassivationStrategySettings(config)
val lease = config.getString("use-lease") match {
2019-04-15 17:40:26 +02:00
case s if s.isEmpty => None
case other => Some(new LeaseUsageSettings(other, config.getDuration("lease-retry-interval").asScala))
}
2019-03-13 10:56:20 +01:00
new ClusterShardingSettings(
role = roleOption(config.getString("role")),
rememberEntities = config.getBoolean("remember-entities"),
journalPluginId = config.getString("journal-plugin-id"),
snapshotPluginId = config.getString("snapshot-plugin-id"),
stateStoreMode = config.getString("state-store-mode"),
rememberEntitiesStore = config.getString("remember-entities-store"),
passivationStrategySettings = passivationStrategySettings,
shardRegionQueryTimeout = config.getDuration("shard-region-query-timeout", MILLISECONDS).millis,
2019-03-13 10:56:20 +01:00
tuningParameters,
coordinatorSingletonSettings,
lease)
}
/**
* Java API: Create settings from the default configuration
* `akka.cluster.sharding`.
*/
def create(system: ActorSystem): ClusterShardingSettings = apply(system)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.sharding`.
*/
def create(config: Config): ClusterShardingSettings = apply(config)
/**
* INTERNAL API
*/
private[akka] def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
@ApiMayChange
final class PassivationStrategySettings private (
val strategy: String,
val idleSettings: PassivationStrategySettings.IdleSettings,
val leastRecentlyUsedSettings: PassivationStrategySettings.LeastRecentlyUsedSettings,
val mostRecentlyUsedSettings: PassivationStrategySettings.MostRecentlyUsedSettings,
val leastFrequentlyUsedSettings: PassivationStrategySettings.LeastFrequentlyUsedSettings,
private[akka] val oldSettingUsed: Boolean) {
def this(
strategy: String,
idleSettings: PassivationStrategySettings.IdleSettings,
leastRecentlyUsedSettings: PassivationStrategySettings.LeastRecentlyUsedSettings,
mostRecentlyUsedSettings: PassivationStrategySettings.MostRecentlyUsedSettings,
leastFrequentlyUsedSettings: PassivationStrategySettings.LeastFrequentlyUsedSettings) =
this(
strategy,
idleSettings,
leastRecentlyUsedSettings,
mostRecentlyUsedSettings,
leastFrequentlyUsedSettings,
oldSettingUsed = false)
import PassivationStrategySettings._
def withIdleStrategy(settings: IdleSettings): PassivationStrategySettings =
copy(strategy = "idle", idleSettings = settings, oldSettingUsed = false)
def withLeastRecentlyUsedStrategy(settings: LeastRecentlyUsedSettings): PassivationStrategySettings =
copy(strategy = "least-recently-used", leastRecentlyUsedSettings = settings)
def withMostRecentlyUsedStrategy(settings: MostRecentlyUsedSettings): PassivationStrategySettings =
copy(strategy = "most-recently-used", mostRecentlyUsedSettings = settings)
def withLeastFrequentlyUsedStrategy(settings: LeastFrequentlyUsedSettings): PassivationStrategySettings =
copy(strategy = "least-frequently-used", leastFrequentlyUsedSettings = settings)
private[akka] def withOldIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings =
copy(strategy = "idle", idleSettings = idleSettings.withTimeout(timeout), oldSettingUsed = true)
private def copy(
strategy: String,
idleSettings: IdleSettings = idleSettings,
leastRecentlyUsedSettings: LeastRecentlyUsedSettings = leastRecentlyUsedSettings,
mostRecentlyUsedSettings: MostRecentlyUsedSettings = mostRecentlyUsedSettings,
leastFrequentlyUsedSettings: LeastFrequentlyUsedSettings = leastFrequentlyUsedSettings,
oldSettingUsed: Boolean = oldSettingUsed): PassivationStrategySettings =
new PassivationStrategySettings(
strategy,
idleSettings,
leastRecentlyUsedSettings,
mostRecentlyUsedSettings,
leastFrequentlyUsedSettings,
oldSettingUsed)
}
object PassivationStrategySettings {
val defaults = new PassivationStrategySettings(
strategy = "idle",
IdleSettings.defaults,
LeastRecentlyUsedSettings.defaults,
MostRecentlyUsedSettings.defaults,
LeastFrequentlyUsedSettings.defaults,
oldSettingUsed = false)
val disabled: PassivationStrategySettings = defaults.copy(strategy = "none")
object IdleSettings {
val defaults: IdleSettings = new IdleSettings(timeout = 2.minutes, interval = None)
def apply(config: Config): IdleSettings = {
val timeout = config.getDuration("timeout", MILLISECONDS).millis
val interval =
if (toRootLowerCase(config.getString("interval")) == "default") None
else Some(config.getDuration("interval", MILLISECONDS).millis)
new IdleSettings(timeout, interval)
}
def optional(config: Config): Option[IdleSettings] = {
if (toRootLowerCase(config.getString("timeout")) == "off") None else Some(IdleSettings(config))
}
}
final class IdleSettings(val timeout: FiniteDuration, val interval: Option[FiniteDuration]) {
def withTimeout(timeout: FiniteDuration): IdleSettings = copy(timeout = timeout)
def withTimeout(timeout: java.time.Duration): IdleSettings = withTimeout(timeout.asScala)
def withInterval(interval: FiniteDuration): IdleSettings = copy(interval = Some(interval))
def withInterval(interval: java.time.Duration): IdleSettings = withInterval(interval.asScala)
private def copy(timeout: FiniteDuration = timeout, interval: Option[FiniteDuration] = interval): IdleSettings =
new IdleSettings(timeout, interval)
}
object LeastRecentlyUsedSettings {
val defaults: LeastRecentlyUsedSettings =
new LeastRecentlyUsedSettings(limit = 100000, segmentedSettings = None, idleSettings = None)
def apply(config: Config): LeastRecentlyUsedSettings = {
val limit = config.getInt("limit")
val idleSettings = IdleSettings.optional(config.getConfig("idle"))
val segmentedSettings = SegmentedSettings.optional(config.getConfig("segmented"))
new LeastRecentlyUsedSettings(limit, segmentedSettings, idleSettings)
}
object SegmentedSettings {
def apply(config: Config): SegmentedSettings = {
val levels = config.getInt("levels")
val proportions = immutableSeq(config.getDoubleList("proportions")).map(_.toDouble)
new SegmentedSettings(levels, proportions)
}
def optional(config: Config): Option[SegmentedSettings] = {
toRootLowerCase(config.getString("levels")) match {
case "off" | "none" => None
case _ => Some(SegmentedSettings(config))
}
}
}
final class SegmentedSettings(val levels: Int, val proportions: immutable.Seq[Double]) {
def withLevels(levels: Int): SegmentedSettings = copy(levels = levels)
def withProportions(proportions: immutable.Seq[Double]): SegmentedSettings = copy(proportions = proportions)
def withProportions(proportions: java.util.List[java.lang.Double]): SegmentedSettings =
copy(proportions = immutableSeq(proportions).map(_.toDouble))
private def copy(levels: Int = levels, proportions: immutable.Seq[Double] = proportions): SegmentedSettings =
new SegmentedSettings(levels, proportions)
}
}
final class LeastRecentlyUsedSettings(
val limit: Int,
val segmentedSettings: Option[LeastRecentlyUsedSettings.SegmentedSettings],
val idleSettings: Option[IdleSettings]) {
def withLimit(limit: Int): LeastRecentlyUsedSettings = copy(limit = limit)
def withSegmented(levels: Int): LeastRecentlyUsedSettings = withSegmented(levels, Nil)
def withSegmented(levels: Int, proportions: immutable.Seq[Double]): LeastRecentlyUsedSettings =
copy(segmentedSettings = Some(new LeastRecentlyUsedSettings.SegmentedSettings(levels, proportions)))
def withSegmentedProportions(
levels: Int,
proportions: java.util.List[java.lang.Double]): LeastRecentlyUsedSettings =
withSegmented(levels, immutableSeq(proportions).map(_.toDouble))
def withIdle(timeout: FiniteDuration): LeastRecentlyUsedSettings =
copy(idleSettings = Some(new IdleSettings(timeout, None)))
def withIdle(timeout: java.time.Duration): LeastRecentlyUsedSettings =
withIdle(timeout.asScala)
def withIdle(timeout: FiniteDuration, interval: FiniteDuration): LeastRecentlyUsedSettings =
copy(idleSettings = Some(new IdleSettings(timeout, Some(interval))))
def withIdle(timeout: java.time.Duration, interval: java.time.Duration): LeastRecentlyUsedSettings =
withIdle(timeout.asScala, interval.asScala)
private def copy(
limit: Int = limit,
segmentedSettings: Option[LeastRecentlyUsedSettings.SegmentedSettings] = segmentedSettings,
idleSettings: Option[IdleSettings] = idleSettings): LeastRecentlyUsedSettings =
new LeastRecentlyUsedSettings(limit, segmentedSettings, idleSettings)
}
object MostRecentlyUsedSettings {
val defaults: MostRecentlyUsedSettings = new MostRecentlyUsedSettings(limit = 100000, idleSettings = None)
def apply(config: Config): MostRecentlyUsedSettings = {
val limit = config.getInt("limit")
val idleSettings = IdleSettings.optional(config.getConfig("idle"))
new MostRecentlyUsedSettings(limit, idleSettings)
}
}
final class MostRecentlyUsedSettings(val limit: Int, val idleSettings: Option[IdleSettings]) {
def withLimit(limit: Int): MostRecentlyUsedSettings = copy(limit = limit)
def withIdle(timeout: FiniteDuration): MostRecentlyUsedSettings =
copy(idleSettings = Some(new IdleSettings(timeout, None)))
def withIdle(timeout: java.time.Duration): MostRecentlyUsedSettings =
withIdle(timeout.asScala)
def withIdle(timeout: FiniteDuration, interval: FiniteDuration): MostRecentlyUsedSettings =
copy(idleSettings = Some(new IdleSettings(timeout, Some(interval))))
def withIdle(timeout: java.time.Duration, interval: java.time.Duration): MostRecentlyUsedSettings =
withIdle(timeout.asScala, interval.asScala)
private def copy(
limit: Int = limit,
idleSettings: Option[IdleSettings] = idleSettings): MostRecentlyUsedSettings =
new MostRecentlyUsedSettings(limit, idleSettings)
}
object LeastFrequentlyUsedSettings {
val defaults: LeastFrequentlyUsedSettings =
new LeastFrequentlyUsedSettings(limit = 100000, dynamicAging = false, idleSettings = None)
def apply(config: Config): LeastFrequentlyUsedSettings = {
val limit = config.getInt("limit")
val dynamicAging = config.getBoolean("dynamic-aging")
val idleSettings = IdleSettings.optional(config.getConfig("idle"))
new LeastFrequentlyUsedSettings(limit, dynamicAging, idleSettings)
}
}
final class LeastFrequentlyUsedSettings(
val limit: Int,
val dynamicAging: Boolean,
val idleSettings: Option[IdleSettings]) {
def withLimit(limit: Int): LeastFrequentlyUsedSettings = copy(limit = limit)
def withDynamicAging(): LeastFrequentlyUsedSettings = withDynamicAging(enabled = true)
def withDynamicAging(enabled: Boolean): LeastFrequentlyUsedSettings = copy(dynamicAging = enabled)
def withIdle(timeout: FiniteDuration): LeastFrequentlyUsedSettings =
copy(idleSettings = Some(new IdleSettings(timeout, None)))
def withIdle(timeout: java.time.Duration): LeastFrequentlyUsedSettings =
withIdle(timeout.asScala)
def withIdle(timeout: FiniteDuration, interval: FiniteDuration): LeastFrequentlyUsedSettings =
copy(idleSettings = Some(new IdleSettings(timeout, Some(interval))))
def withIdle(timeout: java.time.Duration, interval: java.time.Duration): LeastFrequentlyUsedSettings =
withIdle(timeout.asScala, interval.asScala)
private def copy(
limit: Int = limit,
dynamicAging: Boolean = dynamicAging,
idleSettings: Option[IdleSettings] = idleSettings): LeastFrequentlyUsedSettings =
new LeastFrequentlyUsedSettings(limit, dynamicAging, idleSettings)
}
def apply(config: Config): PassivationStrategySettings = {
val settings =
new PassivationStrategySettings(
strategy = toRootLowerCase(config.getString("passivation.strategy")),
idleSettings = IdleSettings(config.getConfig("passivation.idle")),
leastRecentlyUsedSettings = LeastRecentlyUsedSettings(config.getConfig("passivation.least-recently-used")),
mostRecentlyUsedSettings = MostRecentlyUsedSettings(config.getConfig("passivation.most-recently-used")),
leastFrequentlyUsedSettings = LeastFrequentlyUsedSettings(
config.getConfig("passivation.least-frequently-used")))
// default to old setting if it exists (defined in application.conf), overriding the new settings
if (config.hasPath("passivate-idle-entity-after")) {
val timeout =
if (toRootLowerCase(config.getString("passivate-idle-entity-after")) == "off") Duration.Zero
else config.getDuration("passivate-idle-entity-after", MILLISECONDS).millis
settings.withOldIdleStrategy(timeout)
} else {
settings
}
}
private[akka] def oldDefault(idleTimeout: FiniteDuration): PassivationStrategySettings =
defaults.withOldIdleStrategy(idleTimeout)
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] sealed trait PassivationStrategy
private[akka] case object NoPassivationStrategy extends PassivationStrategy
private[akka] object IdlePassivationStrategy {
def apply(settings: PassivationStrategySettings.IdleSettings): IdlePassivationStrategy =
IdlePassivationStrategy(settings.timeout, settings.interval.getOrElse(settings.timeout / 2))
}
private[akka] case class IdlePassivationStrategy(timeout: FiniteDuration, interval: FiniteDuration)
extends PassivationStrategy
private[akka] object LeastRecentlyUsedPassivationStrategy {
def apply(settings: PassivationStrategySettings.LeastRecentlyUsedSettings): LeastRecentlyUsedPassivationStrategy = {
val limit = settings.limit
val idle = settings.idleSettings.map(IdlePassivationStrategy.apply)
settings.segmentedSettings match {
case Some(segmented) =>
val proportions =
if (segmented.levels < 2) Nil
else if (segmented.proportions.isEmpty) List.fill(segmented.levels)(1.0 / segmented.levels)
else segmented.proportions
LeastRecentlyUsedPassivationStrategy(limit, proportions, idle)
case _ => LeastRecentlyUsedPassivationStrategy(limit, Nil, idle)
}
}
}
private[akka] case class LeastRecentlyUsedPassivationStrategy(
limit: Int,
segmented: immutable.Seq[Double],
idle: Option[IdlePassivationStrategy])
extends PassivationStrategy
private[akka] object MostRecentlyUsedPassivationStrategy {
def apply(settings: PassivationStrategySettings.MostRecentlyUsedSettings): MostRecentlyUsedPassivationStrategy =
MostRecentlyUsedPassivationStrategy(settings.limit, settings.idleSettings.map(IdlePassivationStrategy.apply))
}
private[akka] case class MostRecentlyUsedPassivationStrategy(limit: Int, idle: Option[IdlePassivationStrategy])
extends PassivationStrategy
private[akka] object LeastFrequentlyUsedPassivationStrategy {
def apply(
settings: PassivationStrategySettings.LeastFrequentlyUsedSettings): LeastFrequentlyUsedPassivationStrategy =
LeastFrequentlyUsedPassivationStrategy(
settings.limit,
settings.dynamicAging,
settings.idleSettings.map(IdlePassivationStrategy.apply))
}
private[akka] case class LeastFrequentlyUsedPassivationStrategy(
limit: Int,
dynamicAging: Boolean,
idle: Option[IdlePassivationStrategy])
extends PassivationStrategy
/**
* INTERNAL API
* Determine the passivation strategy to use from settings.
*/
@InternalApi
private[akka] object PassivationStrategy {
def apply(settings: ClusterShardingSettings): PassivationStrategy = {
if (settings.rememberEntities) {
NoPassivationStrategy
} else
settings.passivationStrategySettings.strategy match {
case "idle" if settings.passivationStrategySettings.idleSettings.timeout > Duration.Zero =>
IdlePassivationStrategy(settings.passivationStrategySettings.idleSettings)
case "least-recently-used" =>
LeastRecentlyUsedPassivationStrategy(settings.passivationStrategySettings.leastRecentlyUsedSettings)
case "most-recently-used" =>
MostRecentlyUsedPassivationStrategy(settings.passivationStrategySettings.mostRecentlyUsedSettings)
case "least-frequently-used" =>
LeastFrequentlyUsedPassivationStrategy(settings.passivationStrategySettings.leastFrequentlyUsedSettings)
case _ => NoPassivationStrategy
}
}
}
2019-03-13 10:56:20 +01:00
class TuningParameters(
val coordinatorFailureBackoff: FiniteDuration,
val retryInterval: FiniteDuration,
val bufferSize: Int,
val handOffTimeout: FiniteDuration,
val shardStartTimeout: FiniteDuration,
val shardFailureBackoff: FiniteDuration,
val entityRestartBackoff: FiniteDuration,
val rebalanceInterval: FiniteDuration,
val snapshotAfter: Int,
val keepNrOfBatches: Int,
val leastShardAllocationRebalanceThreshold: Int,
val leastShardAllocationMaxSimultaneousRebalance: Int,
val waitingForStateTimeout: FiniteDuration,
val updatingStateTimeout: FiniteDuration,
val entityRecoveryStrategy: String,
val entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
val entityRecoveryConstantRateStrategyNumberOfEntities: Int,
val coordinatorStateWriteMajorityPlus: Int,
val coordinatorStateReadMajorityPlus: Int,
val leastShardAllocationAbsoluteLimit: Int,
val leastShardAllocationRelativeLimit: Double) {
2019-03-13 10:56:20 +01:00
require(
entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant",
s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'")
// included for binary compatibility
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including " +
"leastShardAllocationAbsoluteLimit and leastShardAllocationRelativeLimit instead",
since = "2.6.10")
def this(
coordinatorFailureBackoff: FiniteDuration,
retryInterval: FiniteDuration,
bufferSize: Int,
handOffTimeout: FiniteDuration,
shardStartTimeout: FiniteDuration,
shardFailureBackoff: FiniteDuration,
entityRestartBackoff: FiniteDuration,
rebalanceInterval: FiniteDuration,
snapshotAfter: Int,
keepNrOfBatches: Int,
leastShardAllocationRebalanceThreshold: Int,
leastShardAllocationMaxSimultaneousRebalance: Int,
waitingForStateTimeout: FiniteDuration,
updatingStateTimeout: FiniteDuration,
entityRecoveryStrategy: String,
entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
entityRecoveryConstantRateStrategyNumberOfEntities: Int,
coordinatorStateWriteMajorityPlus: Int,
coordinatorStateReadMajorityPlus: Int) =
this(
coordinatorFailureBackoff,
retryInterval,
bufferSize,
handOffTimeout,
shardStartTimeout,
shardFailureBackoff,
entityRestartBackoff,
rebalanceInterval,
snapshotAfter,
keepNrOfBatches,
leastShardAllocationRebalanceThreshold,
leastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout,
updatingStateTimeout,
entityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities,
coordinatorStateWriteMajorityPlus,
coordinatorStateReadMajorityPlus,
leastShardAllocationAbsoluteLimit = 100,
leastShardAllocationRelativeLimit = 0.1)
#21725 cluster-sharding doesn't delete snapshots and messages (#21777) * #21725 cluster-sharding doesn't delete snapshots and messages Fixes #21725 Without deleting messages those pollute persistence with not needed anymore messages. Naive and bullet proof flow is snapshot -> delete messges -> delete snapshots. # Пожалуйста, введите сообщение коммита для ваших изменений. Строки, # начинающиеся с «#» будут оставлены; вы можете удалить их вручную, # если хотите. Пустое сообщение отменяет процесс коммита. # # Дата: Mon Oct 31 23:24:37 2016 +0300 # # интерактивное перемещение в процессе; над 432b53c # Последняя команда выполнена (1 команда выполнена): # edit f86b015 21725 cluster-sharding doesn't delete snapshots and messages Fixes #21725 Without deleting messages those pollute persistence with not needed anymore messages. Naive and bullet proof flow is snapshot -> delete messges -> delete snapshots. # Следующая команда для выполнения (1 команда осталась): # pick 56adb40 #21725 keeping N number of batches (messages and snapshot) using N from configuration # Вы сейчас редактируете коммит при перемещении ветки «fix-21725-delete-messages-after-snapshot» над «432b53c». # # Изменения, которые будут включены в коммит: # изменено: akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala # изменено: akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala # * #21725 keeping N number of batches (messages and snapshot) using N from configuration
2017-02-21 16:17:19 +03:00
// included for binary compatibility
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including " +
"coordinatorStateWriteMajorityPlus and coordinatorStateReadMajorityPlus instead",
since = "2.6.5")
def this(
coordinatorFailureBackoff: FiniteDuration,
retryInterval: FiniteDuration,
bufferSize: Int,
handOffTimeout: FiniteDuration,
shardStartTimeout: FiniteDuration,
shardFailureBackoff: FiniteDuration,
entityRestartBackoff: FiniteDuration,
rebalanceInterval: FiniteDuration,
snapshotAfter: Int,
keepNrOfBatches: Int,
leastShardAllocationRebalanceThreshold: Int,
leastShardAllocationMaxSimultaneousRebalance: Int,
waitingForStateTimeout: FiniteDuration,
updatingStateTimeout: FiniteDuration,
entityRecoveryStrategy: String,
entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
entityRecoveryConstantRateStrategyNumberOfEntities: Int) =
this(
coordinatorFailureBackoff,
retryInterval,
bufferSize,
handOffTimeout,
shardStartTimeout,
shardFailureBackoff,
entityRestartBackoff,
rebalanceInterval,
snapshotAfter,
keepNrOfBatches,
leastShardAllocationRebalanceThreshold,
leastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout,
updatingStateTimeout,
entityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities,
coordinatorStateWriteMajorityPlus = 5,
coordinatorStateReadMajorityPlus = 5)
// included for binary compatibility
@deprecated("Use the ClusterShardingSettings factory methods or the full constructor instead", since = "2.6.5")
2019-03-13 10:56:20 +01:00
def this(
coordinatorFailureBackoff: FiniteDuration,
retryInterval: FiniteDuration,
bufferSize: Int,
handOffTimeout: FiniteDuration,
shardStartTimeout: FiniteDuration,
shardFailureBackoff: FiniteDuration,
entityRestartBackoff: FiniteDuration,
rebalanceInterval: FiniteDuration,
snapshotAfter: Int,
leastShardAllocationRebalanceThreshold: Int,
leastShardAllocationMaxSimultaneousRebalance: Int,
waitingForStateTimeout: FiniteDuration,
updatingStateTimeout: FiniteDuration,
entityRecoveryStrategy: String,
entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
entityRecoveryConstantRateStrategyNumberOfEntities: Int) = {
this(
coordinatorFailureBackoff,
retryInterval,
bufferSize,
handOffTimeout,
shardStartTimeout,
shardFailureBackoff,
entityRestartBackoff,
rebalanceInterval,
snapshotAfter,
2,
leastShardAllocationRebalanceThreshold,
leastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout,
updatingStateTimeout,
entityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities)
#21725 cluster-sharding doesn't delete snapshots and messages (#21777) * #21725 cluster-sharding doesn't delete snapshots and messages Fixes #21725 Without deleting messages those pollute persistence with not needed anymore messages. Naive and bullet proof flow is snapshot -> delete messges -> delete snapshots. # Пожалуйста, введите сообщение коммита для ваших изменений. Строки, # начинающиеся с «#» будут оставлены; вы можете удалить их вручную, # если хотите. Пустое сообщение отменяет процесс коммита. # # Дата: Mon Oct 31 23:24:37 2016 +0300 # # интерактивное перемещение в процессе; над 432b53c # Последняя команда выполнена (1 команда выполнена): # edit f86b015 21725 cluster-sharding doesn't delete snapshots and messages Fixes #21725 Without deleting messages those pollute persistence with not needed anymore messages. Naive and bullet proof flow is snapshot -> delete messges -> delete snapshots. # Следующая команда для выполнения (1 команда осталась): # pick 56adb40 #21725 keeping N number of batches (messages and snapshot) using N from configuration # Вы сейчас редактируете коммит при перемещении ветки «fix-21725-delete-messages-after-snapshot» над «432b53c». # # Изменения, которые будут включены в коммит: # изменено: akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala # изменено: akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala # * #21725 keeping N number of batches (messages and snapshot) using N from configuration
2017-02-21 16:17:19 +03:00
}
// included for binary compatibility
@deprecated("Use the ClusterShardingSettings factory methods or the full constructor instead", since = "2.6.5")
2019-03-13 10:56:20 +01:00
def this(
coordinatorFailureBackoff: FiniteDuration,
retryInterval: FiniteDuration,
bufferSize: Int,
handOffTimeout: FiniteDuration,
shardStartTimeout: FiniteDuration,
shardFailureBackoff: FiniteDuration,
entityRestartBackoff: FiniteDuration,
rebalanceInterval: FiniteDuration,
snapshotAfter: Int,
leastShardAllocationRebalanceThreshold: Int,
leastShardAllocationMaxSimultaneousRebalance: Int,
waitingForStateTimeout: FiniteDuration,
updatingStateTimeout: FiniteDuration) = {
this(
coordinatorFailureBackoff,
retryInterval,
bufferSize,
handOffTimeout,
shardStartTimeout,
shardFailureBackoff,
entityRestartBackoff,
rebalanceInterval,
snapshotAfter,
leastShardAllocationRebalanceThreshold,
leastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout,
updatingStateTimeout,
"all",
100.milliseconds,
5)
}
}
}
/**
* @param role specifies that this entity type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param rememberEntities true if active entity actors shall be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param journalPluginId Absolute path to the journal plugin configuration entity that is to
* be used for the internal persistence of ClusterSharding. If not defined the default
* journal plugin is used. Note that this is not related to persistence used by the entity
* actors.
* @param snapshotPluginId Absolute path to the snapshot plugin configuration entity that is to
* be used for the internal persistence of ClusterSharding. If not defined the default
* snapshot plugin is used. Note that this is not related to persistence used by the entity
* actors.
* @param passivationStrategySettings settings for automatic passivation strategy, see descriptions in reference.conf
* @param tuningParameters additional tuning parameters, see descriptions in reference.conf
* @param shardRegionQueryTimeout the timeout for querying a shard region, see descriptions in reference.conf
*/
2019-03-13 10:56:20 +01:00
final class ClusterShardingSettings(
val role: Option[String],
val rememberEntities: Boolean,
val journalPluginId: String,
val snapshotPluginId: String,
val stateStoreMode: String,
val rememberEntitiesStore: String,
val passivationStrategySettings: ClusterShardingSettings.PassivationStrategySettings,
val shardRegionQueryTimeout: FiniteDuration,
2019-03-13 10:56:20 +01:00
val tuningParameters: ClusterShardingSettings.TuningParameters,
val coordinatorSingletonSettings: ClusterSingletonManagerSettings,
val leaseSettings: Option[LeaseUsageSettings])
2019-03-11 10:38:24 +01:00
extends NoSerializationVerificationNeeded {
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including passivationStrategySettings instead",
"2.6.18")
def this(
role: Option[String],
rememberEntities: Boolean,
journalPluginId: String,
snapshotPluginId: String,
stateStoreMode: String,
rememberEntitiesStore: String,
passivateIdleEntityAfter: FiniteDuration,
shardRegionQueryTimeout: FiniteDuration,
tuningParameters: ClusterShardingSettings.TuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings,
leaseSettings: Option[LeaseUsageSettings]) =
this(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
rememberEntitiesStore,
ClusterShardingSettings.PassivationStrategySettings.oldDefault(passivateIdleEntityAfter),
shardRegionQueryTimeout,
tuningParameters,
coordinatorSingletonSettings,
leaseSettings)
@deprecated(
2020-06-11 16:04:23 +02:00
"Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStore instead",
"2.6.7")
def this(
role: Option[String],
rememberEntities: Boolean,
journalPluginId: String,
snapshotPluginId: String,
stateStoreMode: String,
passivateIdleEntityAfter: FiniteDuration,
shardRegionQueryTimeout: FiniteDuration,
tuningParameters: ClusterShardingSettings.TuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings,
leaseSettings: Option[LeaseUsageSettings]) =
this(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
"ddata",
passivateIdleEntityAfter,
shardRegionQueryTimeout,
tuningParameters,
coordinatorSingletonSettings,
leaseSettings)
// bin compat for 2.5.23
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including shardRegionQueryTimeout instead",
since = "2.6.0")
def this(
role: Option[String],
rememberEntities: Boolean,
journalPluginId: String,
snapshotPluginId: String,
stateStoreMode: String,
passivateIdleEntityAfter: FiniteDuration,
tuningParameters: ClusterShardingSettings.TuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings,
leaseSettings: Option[LeaseUsageSettings]) =
this(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
passivateIdleEntityAfter,
3.seconds,
tuningParameters,
coordinatorSingletonSettings,
leaseSettings)
// bin compat for 2.5.21
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including shardRegionQueryTimeout instead",
since = "2.5.21")
def this(
role: Option[String],
rememberEntities: Boolean,
journalPluginId: String,
snapshotPluginId: String,
stateStoreMode: String,
passivateIdleEntityAfter: FiniteDuration,
tuningParameters: ClusterShardingSettings.TuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings) =
this(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
passivateIdleEntityAfter,
3.seconds,
tuningParameters,
coordinatorSingletonSettings,
None)
// included for binary compatibility reasons
2019-03-11 10:38:24 +01:00
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including passivateIdleEntityAfter instead",
since = "2.5.18")
2019-03-13 10:56:20 +01:00
def this(
role: Option[String],
rememberEntities: Boolean,
journalPluginId: String,
snapshotPluginId: String,
stateStoreMode: String,
tuningParameters: ClusterShardingSettings.TuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings) =
this(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
Duration.Zero,
tuningParameters,
coordinatorSingletonSettings)
2019-03-11 10:38:24 +01:00
import ClusterShardingSettings.{ RememberEntitiesStoreCustom, StateStoreModeDData, StateStoreModePersistence }
2019-03-13 10:56:20 +01:00
require(
stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData || stateStoreMode == RememberEntitiesStoreCustom,
2019-03-13 10:56:20 +01:00
s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'")
/** If true, this node should run the shard region, otherwise just a shard proxy should started on this node. */
@InternalApi
private[akka] def shouldHostShard(cluster: Cluster): Boolean =
role.forall(cluster.selfMember.roles.contains)
@InternalApi
private[akka] val passivationStrategy: ClusterShardingSettings.PassivationStrategy =
ClusterShardingSettings.PassivationStrategy(this)
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))
def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role)
def withRememberEntities(rememberEntities: Boolean): ClusterShardingSettings =
copy(rememberEntities = rememberEntities)
def withJournalPluginId(journalPluginId: String): ClusterShardingSettings =
copy(journalPluginId = journalPluginId)
def withSnapshotPluginId(snapshotPluginId: String): ClusterShardingSettings =
copy(snapshotPluginId = snapshotPluginId)
def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings =
copy(tuningParameters = tuningParameters)
def withStateStoreMode(stateStoreMode: String): ClusterShardingSettings =
copy(stateStoreMode = stateStoreMode)
@deprecated("See passivationStrategySettings.idleTimeout instead", since = "2.6.18")
def passivateIdleEntityAfter: FiniteDuration = passivationStrategySettings.idleSettings.timeout
@deprecated("Use withIdlePassivationStrategy instead", since = "2.6.18")
def withPassivateIdleAfter(duration: FiniteDuration): ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withOldIdleStrategy(duration))
@deprecated("Use withIdlePassivationStrategy instead", since = "2.6.18")
def withPassivateIdleAfter(duration: java.time.Duration): ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withOldIdleStrategy(duration.asScala))
def withPassivationStrategy(settings: ClusterShardingSettings.PassivationStrategySettings): ClusterShardingSettings =
copy(passivationStrategySettings = settings)
def withNoPassivationStrategy(): ClusterShardingSettings =
copy(passivationStrategySettings = ClusterShardingSettings.PassivationStrategySettings.disabled)
def withIdlePassivationStrategy(
settings: ClusterShardingSettings.PassivationStrategySettings.IdleSettings): ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withIdleStrategy(settings))
def withLeastRecentlyUsedPassivationStrategy(
settings: ClusterShardingSettings.PassivationStrategySettings.LeastRecentlyUsedSettings)
: ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withLeastRecentlyUsedStrategy(settings))
def withMostRecentlyUsedPassivationStrategy(
settings: ClusterShardingSettings.PassivationStrategySettings.MostRecentlyUsedSettings): ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withMostRecentlyUsedStrategy(settings))
def withLeastFrequentlyUsedPassivationStrategy(
settings: ClusterShardingSettings.PassivationStrategySettings.LeastFrequentlyUsedSettings)
: ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withLeastFrequentlyUsedStrategy(settings))
def withShardRegionQueryTimeout(duration: FiniteDuration): ClusterShardingSettings =
copy(shardRegionQueryTimeout = duration)
def withShardRegionQueryTimeout(duration: java.time.Duration): ClusterShardingSettings =
copy(shardRegionQueryTimeout = duration.asScala)
def withLeaseSettings(leaseSettings: LeaseUsageSettings): ClusterShardingSettings =
copy(leaseSettings = Some(leaseSettings))
2015-12-18 11:15:06 +01:00
/**
* The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the
* coordinator singleton will be the same as the `role` of `ClusterShardingSettings`.
*/
2019-03-11 10:38:24 +01:00
def withCoordinatorSingletonSettings(
coordinatorSingletonSettings: ClusterSingletonManagerSettings): ClusterShardingSettings =
copy(coordinatorSingletonSettings = coordinatorSingletonSettings)
2019-03-13 10:56:20 +01:00
private def copy(
role: Option[String] = role,
rememberEntities: Boolean = rememberEntities,
journalPluginId: String = journalPluginId,
snapshotPluginId: String = snapshotPluginId,
stateStoreMode: String = stateStoreMode,
passivationStrategySettings: ClusterShardingSettings.PassivationStrategySettings = passivationStrategySettings,
shardRegionQueryTimeout: FiniteDuration = shardRegionQueryTimeout,
2019-03-13 10:56:20 +01:00
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings,
leaseSettings: Option[LeaseUsageSettings] = leaseSettings): ClusterShardingSettings =
2019-03-13 10:56:20 +01:00
new ClusterShardingSettings(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
rememberEntitiesStore,
passivationStrategySettings,
shardRegionQueryTimeout,
2019-03-13 10:56:20 +01:00
tuningParameters,
coordinatorSingletonSettings,
leaseSettings)
}