diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index 651cc630c5..51a7bcf905 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -55,7 +55,8 @@ object ClusterShardingSettings { classicSettings.coordinatorSingletonSettings.singletonName, classicSettings.coordinatorSingletonSettings.role, classicSettings.coordinatorSingletonSettings.removalMargin, - classicSettings.coordinatorSingletonSettings.handOverRetryInterval), + classicSettings.coordinatorSingletonSettings.handOverRetryInterval, + classicSettings.coordinatorSingletonSettings.leaseSettings), leaseSettings = classicSettings.leaseSettings) } @@ -99,7 +100,8 @@ object ClusterShardingSettings { settings.coordinatorSingletonSettings.singletonName, settings.coordinatorSingletonSettings.role, settings.coordinatorSingletonSettings.removalMargin, - settings.coordinatorSingletonSettings.handOverRetryInterval), + settings.coordinatorSingletonSettings.handOverRetryInterval, + settings.coordinatorSingletonSettings.leaseSettings), leaseSettings = settings.leaseSettings) } @@ -449,6 +451,8 @@ final class ClusterShardingSettings( def withShardRegionQueryTimeout(duration: java.time.Duration): ClusterShardingSettings = copy(shardRegionQueryTimeout = duration.asScala) + def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings)) + /** * The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the * coordinator singleton will be the same as the `role` of `ClusterShardingSettings`. diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala index 34b83a5d67..7367462dc5 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala @@ -4,13 +4,9 @@ package akka.cluster.typed -import scala.concurrent.duration._ -import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.duration.{ Duration, FiniteDuration, _ } -import com.typesafe.config.Config - -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } -import akka.actor.typed.ExtensionSetup +import akka.actor.typed._ import akka.annotation.{ DoNotInherit, InternalApi } import akka.cluster.ClusterSettings.DataCenter import akka.cluster.singleton.{ @@ -18,7 +14,9 @@ import akka.cluster.singleton.{ ClusterSingletonManagerSettings => ClassicClusterSingletonManagerSettings } import akka.cluster.typed.internal.AdaptedClusterSingletonImpl +import akka.coordination.lease.LeaseUsageSettings import akka.util.JavaDurationConverters._ +import com.typesafe.config.Config object ClusterSingletonSettings { def apply(system: ActorSystem[_]): ClusterSingletonSettings = @@ -40,7 +38,8 @@ object ClusterSingletonSettings { proxySettings.singletonIdentificationInterval, mgrSettings.removalMargin, mgrSettings.handOverRetryInterval, - proxySettings.bufferSize) + proxySettings.bufferSize, + mgrSettings.leaseSettings) } } @@ -50,7 +49,19 @@ final class ClusterSingletonSettings( val singletonIdentificationInterval: FiniteDuration, val removalMargin: FiniteDuration, val handOverRetryInterval: FiniteDuration, - val bufferSize: Int) { + val bufferSize: Int, + val leaseSettings: Option[LeaseUsageSettings]) { + + // bin compat for 2.6.14 + @deprecated("Use constructor with leaseSettings", "2.6.15") + def this( + role: Option[String], + dataCenter: Option[DataCenter], + singletonIdentificationInterval: FiniteDuration, + removalMargin: FiniteDuration, + handOverRetryInterval: FiniteDuration, + bufferSize: Int) = + this(role, dataCenter, singletonIdentificationInterval, removalMargin, handOverRetryInterval, bufferSize, None) def withRole(role: String): ClusterSingletonSettings = copy(role = Some(role)) @@ -61,37 +72,43 @@ final class ClusterSingletonSettings( def withNoDataCenter(): ClusterSingletonSettings = copy(dataCenter = None) def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonSettings = copy(removalMargin = removalMargin) + def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonSettings = withRemovalMargin(removalMargin.asScala) def withHandoverRetryInterval(handOverRetryInterval: FiniteDuration): ClusterSingletonSettings = copy(handOverRetryInterval = handOverRetryInterval) + def withHandoverRetryInterval(handOverRetryInterval: java.time.Duration): ClusterSingletonSettings = withHandoverRetryInterval(handOverRetryInterval.asScala) def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize) + def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings)) + private def copy( role: Option[String] = role, dataCenter: Option[DataCenter] = dataCenter, singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval, removalMargin: FiniteDuration = removalMargin, handOverRetryInterval: FiniteDuration = handOverRetryInterval, - bufferSize: Int = bufferSize) = + bufferSize: Int = bufferSize, + leaseSettings: Option[LeaseUsageSettings] = leaseSettings) = new ClusterSingletonSettings( role, dataCenter, singletonIdentificationInterval, removalMargin, handOverRetryInterval, - bufferSize) + bufferSize, + leaseSettings) /** * INTERNAL API: */ @InternalApi private[akka] def toManagerSettings(singletonName: String): ClassicClusterSingletonManagerSettings = - new ClassicClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) + new ClassicClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval, leaseSettings) /** * INTERNAL API: @@ -112,7 +129,7 @@ final class ClusterSingletonSettings( } override def toString = - s"ClusterSingletonSettings($role, $dataCenter, $singletonIdentificationInterval, $removalMargin, $handOverRetryInterval, $bufferSize)" + s"ClusterSingletonSettings($role, $dataCenter, $singletonIdentificationInterval, $removalMargin, $handOverRetryInterval, $bufferSize, $leaseSettings)" } object ClusterSingleton extends ExtensionId[ClusterSingleton] { @@ -217,12 +234,19 @@ object ClusterSingletonManagerSettings { * Create settings from a configuration with the same layout as * the default configuration `akka.cluster.singleton`. */ - def apply(config: Config): ClusterSingletonManagerSettings = + def apply(config: Config): ClusterSingletonManagerSettings = { + val lease = config.getString("use-lease") match { + case s if s.isEmpty => None + case leaseConfigPath => + Some(new LeaseUsageSettings(leaseConfigPath, config.getDuration("lease-retry-interval").asScala)) + } new ClusterSingletonManagerSettings( singletonName = config.getString("singleton-name"), role = roleOption(config.getString("role")), removalMargin = Duration.Zero, // defaults to ClusterSettings.DownRemovalMargin - handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis) + handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis, + lease) + } /** * Java API: Create settings from the default configuration @@ -245,30 +269,38 @@ object ClusterSingletonManagerSettings { } /** - * @param singletonName The actor name of the child singleton actor. - * - * @param role Singleton among the nodes tagged with specified role. - * If the role is not specified it's a singleton among all nodes in - * the cluster. - * - * @param removalMargin Margin until the singleton instance that belonged to - * a downed/removed partition is created in surviving partition. The purpose of - * this margin is that in case of a network partition the singleton actors - * in the non-surviving partitions must be stopped before corresponding actors - * are started somewhere else. This is especially important for persistent - * actors. - * + * @param singletonName The actor name of the child singleton actor. + * @param role Singleton among the nodes tagged with specified role. + * If the role is not specified it's a singleton among all nodes in + * the cluster. + * @param removalMargin Margin until the singleton instance that belonged to + * a downed/removed partition is created in surviving partition. The purpose of + * this margin is that in case of a network partition the singleton actors + * in the non-surviving partitions must be stopped before corresponding actors + * are started somewhere else. This is especially important for persistent + * actors. * @param handOverRetryInterval When a node is becoming oldest it sends hand-over - * request to previous oldest, that might be leaving the cluster. This is - * retried with this interval until the previous oldest confirms that the hand - * over has started or the previous oldest member is removed from the cluster - * (+ `removalMargin`). + * request to previous oldest, that might be leaving the cluster. This is + * retried with this interval until the previous oldest confirms that the hand + * over has started or the previous oldest member is removed from the cluster + * (+ `removalMargin`). + * @param leaseSettings LeaseSettings for acquiring before creating the singleton actor */ final class ClusterSingletonManagerSettings( val singletonName: String, val role: Option[String], val removalMargin: FiniteDuration, - val handOverRetryInterval: FiniteDuration) { + val handOverRetryInterval: FiniteDuration, + val leaseSettings: Option[LeaseUsageSettings]) { + + // bin compat for 2.6.14 + @deprecated("Use constructor with leaseSettings", "2.6.15") + def this( + singletonName: String, + role: Option[String], + removalMargin: FiniteDuration, + handOverRetryInterval: FiniteDuration) = + this(singletonName, role, removalMargin, handOverRetryInterval, None) def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name) @@ -279,20 +311,25 @@ final class ClusterSingletonManagerSettings( def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings = copy(removalMargin = removalMargin) + def withRemovalMargin(removalMargin: java.time.Duration): ClusterSingletonManagerSettings = withRemovalMargin(removalMargin.asScala) def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings = copy(handOverRetryInterval = retryInterval) + def withHandOverRetryInterval(retryInterval: java.time.Duration): ClusterSingletonManagerSettings = withHandOverRetryInterval(retryInterval.asScala) + def withLeaseSettings(leaseSettings: LeaseUsageSettings) = copy(leaseSettings = Option(leaseSettings)) + private def copy( singletonName: String = singletonName, role: Option[String] = role, removalMargin: FiniteDuration = removalMargin, - handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings = - new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) + handOverRetryInterval: FiniteDuration = handOverRetryInterval, + leaseSettings: Option[LeaseUsageSettings] = leaseSettings): ClusterSingletonManagerSettings = + new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval, leaseSettings) } object ClusterSingletonSetup {