lease settings support for typed cluster singleton (#30209)

This commit is contained in:
Xiaoguang Zhang 2021-05-19 17:05:40 +08:00 committed by GitHub
parent 18e78816c3
commit a19e73a253
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 36 deletions

View file

@ -55,7 +55,8 @@ object ClusterShardingSettings {
classicSettings.coordinatorSingletonSettings.singletonName,
classicSettings.coordinatorSingletonSettings.role,
classicSettings.coordinatorSingletonSettings.removalMargin,
classicSettings.coordinatorSingletonSettings.handOverRetryInterval),
classicSettings.coordinatorSingletonSettings.handOverRetryInterval,
classicSettings.coordinatorSingletonSettings.leaseSettings),
leaseSettings = classicSettings.leaseSettings)
}
@ -99,7 +100,8 @@ object ClusterShardingSettings {
settings.coordinatorSingletonSettings.singletonName,
settings.coordinatorSingletonSettings.role,
settings.coordinatorSingletonSettings.removalMargin,
settings.coordinatorSingletonSettings.handOverRetryInterval),
settings.coordinatorSingletonSettings.handOverRetryInterval,
settings.coordinatorSingletonSettings.leaseSettings),
leaseSettings = settings.leaseSettings)
}
@ -449,6 +451,8 @@ final class ClusterShardingSettings(
def withShardRegionQueryTimeout(duration: java.time.Duration): ClusterShardingSettings =
copy(shardRegionQueryTimeout = duration.asScala)
def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))
/**
* The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the
* coordinator singleton will be the same as the `role` of `ClusterShardingSettings`.

View file

@ -4,13 +4,9 @@
package akka.cluster.typed
import scala.concurrent.duration._
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.duration.{ Duration, FiniteDuration, _ }
import com.typesafe.config.Config
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
import akka.actor.typed.ExtensionSetup
import akka.actor.typed._
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.singleton.{
@ -18,7 +14,9 @@ import akka.cluster.singleton.{
ClusterSingletonManagerSettings => ClassicClusterSingletonManagerSettings
}
import akka.cluster.typed.internal.AdaptedClusterSingletonImpl
import akka.coordination.lease.LeaseUsageSettings
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config
object ClusterSingletonSettings {
def apply(system: ActorSystem[_]): ClusterSingletonSettings =
@ -40,7 +38,8 @@ object ClusterSingletonSettings {
proxySettings.singletonIdentificationInterval,
mgrSettings.removalMargin,
mgrSettings.handOverRetryInterval,
proxySettings.bufferSize)
proxySettings.bufferSize,
mgrSettings.leaseSettings)
}
}
@ -50,7 +49,19 @@ final class ClusterSingletonSettings(
val singletonIdentificationInterval: FiniteDuration,
val removalMargin: FiniteDuration,
val handOverRetryInterval: FiniteDuration,
val bufferSize: Int) {
val bufferSize: Int,
val leaseSettings: Option[LeaseUsageSettings]) {
// bin compat for 2.6.14
@deprecated("Use constructor with leaseSettings", "2.6.15")
def this(
role: Option[String],
dataCenter: Option[DataCenter],
singletonIdentificationInterval: FiniteDuration,
removalMargin: FiniteDuration,
handOverRetryInterval: FiniteDuration,
bufferSize: Int) =
this(role, dataCenter, singletonIdentificationInterval, removalMargin, handOverRetryInterval, bufferSize, None)
def withRole(role: String): ClusterSingletonSettings = copy(role = Some(role))
@ -61,37 +72,43 @@ final class ClusterSingletonSettings(
def withNoDataCenter(): ClusterSingletonSettings = copy(dataCenter = None)
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonSettings = copy(removalMargin = removalMargin)
def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonSettings =
withRemovalMargin(removalMargin.asScala)
def withHandoverRetryInterval(handOverRetryInterval: FiniteDuration): ClusterSingletonSettings =
copy(handOverRetryInterval = handOverRetryInterval)
def withHandoverRetryInterval(handOverRetryInterval: java.time.Duration): ClusterSingletonSettings =
withHandoverRetryInterval(handOverRetryInterval.asScala)
def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize)
def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))
private def copy(
role: Option[String] = role,
dataCenter: Option[DataCenter] = dataCenter,
singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval,
removalMargin: FiniteDuration = removalMargin,
handOverRetryInterval: FiniteDuration = handOverRetryInterval,
bufferSize: Int = bufferSize) =
bufferSize: Int = bufferSize,
leaseSettings: Option[LeaseUsageSettings] = leaseSettings) =
new ClusterSingletonSettings(
role,
dataCenter,
singletonIdentificationInterval,
removalMargin,
handOverRetryInterval,
bufferSize)
bufferSize,
leaseSettings)
/**
* INTERNAL API:
*/
@InternalApi
private[akka] def toManagerSettings(singletonName: String): ClassicClusterSingletonManagerSettings =
new ClassicClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
new ClassicClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval, leaseSettings)
/**
* INTERNAL API:
@ -112,7 +129,7 @@ final class ClusterSingletonSettings(
}
override def toString =
s"ClusterSingletonSettings($role, $dataCenter, $singletonIdentificationInterval, $removalMargin, $handOverRetryInterval, $bufferSize)"
s"ClusterSingletonSettings($role, $dataCenter, $singletonIdentificationInterval, $removalMargin, $handOverRetryInterval, $bufferSize, $leaseSettings)"
}
object ClusterSingleton extends ExtensionId[ClusterSingleton] {
@ -217,12 +234,19 @@ object ClusterSingletonManagerSettings {
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.singleton`.
*/
def apply(config: Config): ClusterSingletonManagerSettings =
def apply(config: Config): ClusterSingletonManagerSettings = {
val lease = config.getString("use-lease") match {
case s if s.isEmpty => None
case leaseConfigPath =>
Some(new LeaseUsageSettings(leaseConfigPath, config.getDuration("lease-retry-interval").asScala))
}
new ClusterSingletonManagerSettings(
singletonName = config.getString("singleton-name"),
role = roleOption(config.getString("role")),
removalMargin = Duration.Zero, // defaults to ClusterSettings.DownRemovalMargin
handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis)
handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis,
lease)
}
/**
* Java API: Create settings from the default configuration
@ -246,29 +270,37 @@ object ClusterSingletonManagerSettings {
/**
* @param singletonName The actor name of the child singleton actor.
*
* @param role Singleton among the nodes tagged with specified role.
* If the role is not specified it's a singleton among all nodes in
* the cluster.
*
* @param removalMargin Margin until the singleton instance that belonged to
* a downed/removed partition is created in surviving partition. The purpose of
* this margin is that in case of a network partition the singleton actors
* in the non-surviving partitions must be stopped before corresponding actors
* are started somewhere else. This is especially important for persistent
* actors.
*
* @param handOverRetryInterval When a node is becoming oldest it sends hand-over
* request to previous oldest, that might be leaving the cluster. This is
* retried with this interval until the previous oldest confirms that the hand
* over has started or the previous oldest member is removed from the cluster
* (+ `removalMargin`).
* @param leaseSettings LeaseSettings for acquiring before creating the singleton actor
*/
final class ClusterSingletonManagerSettings(
val singletonName: String,
val role: Option[String],
val removalMargin: FiniteDuration,
val handOverRetryInterval: FiniteDuration) {
val handOverRetryInterval: FiniteDuration,
val leaseSettings: Option[LeaseUsageSettings]) {
// bin compat for 2.6.14
@deprecated("Use constructor with leaseSettings", "2.6.15")
def this(
singletonName: String,
role: Option[String],
removalMargin: FiniteDuration,
handOverRetryInterval: FiniteDuration) =
this(singletonName, role, removalMargin, handOverRetryInterval, None)
def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name)
@ -279,20 +311,25 @@ final class ClusterSingletonManagerSettings(
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings =
copy(removalMargin = removalMargin)
def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonManagerSettings =
withRemovalMargin(removalMargin.asScala)
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
copy(handOverRetryInterval = retryInterval)
def withHandOverRetryInterval(retryInterval: java.time.Duration): ClusterSingletonManagerSettings =
withHandOverRetryInterval(retryInterval.asScala)
def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings))
private def copy(
singletonName: String = singletonName,
role: Option[String] = role,
removalMargin: FiniteDuration = removalMargin,
handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
handOverRetryInterval: FiniteDuration = handOverRetryInterval,
leaseSettings: Option[LeaseUsageSettings] = leaseSettings): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval, leaseSettings)
}
object ClusterSingletonSetup {