!cto #17454 Introduce ClusterSingletonManagerSettings and ClusterSingletonProxySettings

This commit is contained in:
Patrik Nordwall 2015-04-29 18:23:45 +02:00
parent ebc39ef9ab
commit 7ab5da21d3
16 changed files with 288 additions and 160 deletions

View file

@ -4,12 +4,14 @@
package akka.cluster.singleton
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.collection.immutable
import akka.actor.Actor
import akka.actor.Actor.Receive
import akka.actor.Deploy
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.Address
@ -21,6 +23,109 @@ import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.AkkaException
import akka.actor.NoSerializationVerificationNeeded
object ClusterSingletonManagerSettings {
/**
* Create settings from the default configuration
* `akka.cluster.singleton`.
*/
def apply(system: ActorSystem): ClusterSingletonManagerSettings =
apply(system.settings.config.getConfig("akka.cluster.singleton"))
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.singleton`.
*/
def apply(config: Config): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(
singletonName = config.getString("singleton-name"),
role = roleOption(config.getString("role")),
maxHandOverRetries = config.getInt("max-hand-over-retries"),
maxTakeOverRetries = config.getInt("max-take-over-retries"),
retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis)
/**
* Java API: Create settings from the default configuration
* `akka.cluster.singleton`.
*/
def create(system: ActorSystem): ClusterSingletonManagerSettings = apply(system)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.singleton`.
*/
def create(config: Config): ClusterSingletonManagerSettings = apply(config)
/**
* INTERNAL API
*/
private[akka] def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
}
/**
* @param singletonName The actor name of the child singleton actor.
*
* @param role Singleton among the nodes tagged with specified role.
* If the role is not specified it's a singleton among all nodes in
* the cluster.
*
* @param maxHandOverRetries When a node is becoming oldest it sends
* hand-over request to previous oldest. This is retried with the
* `retryInterval` until the previous oldest confirms that the hand
* over has started, or this `maxHandOverRetries` limit has been
* reached. If the retry limit is reached it takes the decision to be
* the new oldest if previous oldest is unknown (typically removed),
* otherwise it initiates a new round by throwing
* [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] and expecting
* restart with fresh state. For a cluster with many members you might
* need to increase this retry limit because it takes longer time to
* propagate changes across all nodes.
*
* @param maxTakeOverRetries When a oldest node leaves the cluster it is
* not oldest any more and then it sends take over request to the new oldest to
* initiate the hand-over process. This is retried with the `retryInterval` until
* this retry limit has been reached. If the retry limit is reached it initiates
* a new round by throwing [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]]
* and expecting restart with fresh state. This will also cause the singleton actor
* to be stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to
* ensure that new oldest doesn't start singleton actor before previous is
* stopped for certain corner cases.
*
* @param retryInterval Interval for hand over and take over messages
*/
final class ClusterSingletonManagerSettings(
val singletonName: String,
val role: Option[String],
val maxHandOverRetries: Int,
val maxTakeOverRetries: Int,
val retryInterval: FiniteDuration) extends NoSerializationVerificationNeeded {
// to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases
require(maxTakeOverRetries < maxHandOverRetries,
s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]")
def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name)
def withRole(role: String): ClusterSingletonManagerSettings = copy(role = ClusterSingletonManagerSettings.roleOption(role))
def withRole(role: Option[String]) = copy(role = role)
def withRetry(maxHandOverRetries: Int, maxTakeOverRetries: Int, retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
copy(maxHandOverRetries = maxHandOverRetries,
maxTakeOverRetries = maxTakeOverRetries,
retryInterval = retryInterval)
private def copy(singletonName: String = singletonName,
role: Option[String] = role,
maxHandOverRetries: Int = maxHandOverRetries,
maxTakeOverRetries: Int = maxTakeOverRetries,
retryInterval: FiniteDuration = retryInterval): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, maxHandOverRetries, maxTakeOverRetries, retryInterval)
}
object ClusterSingletonManager {
@ -29,39 +134,9 @@ object ClusterSingletonManager {
*/
def props(
singletonProps: Props,
singletonName: String,
terminationMessage: Any,
role: Option[String],
maxHandOverRetries: Int = 10,
maxTakeOverRetries: Int = 5,
retryInterval: FiniteDuration = 1.second): Props =
Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role,
maxHandOverRetries, maxTakeOverRetries, retryInterval).withDeploy(Deploy.local)
/**
* Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]].
*/
def props(
singletonProps: Props,
singletonName: String,
terminationMessage: Any,
role: String,
maxHandOverRetries: Int,
maxTakeOverRetries: Int,
retryInterval: FiniteDuration): Props =
props(singletonProps, singletonName, terminationMessage,
ClusterSingletonManager.Internal.roleOption(role), maxHandOverRetries, maxTakeOverRetries, retryInterval)
/**
* Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]]
* with default values.
*/
def defaultProps(
singletonProps: Props,
singletonName: String,
terminationMessage: Any,
role: String): Props =
props(singletonProps, singletonName, terminationMessage, ClusterSingletonManager.Internal.roleOption(role))
settings: ClusterSingletonManagerSettings): Props =
Props(new ClusterSingletonManager(singletonProps, terminationMessage, settings)).withDeploy(Deploy.local)
/**
* INTERNAL API
@ -132,11 +207,6 @@ object ClusterSingletonManager {
val TakeOverRetryTimer = "take-over-retry"
val CleanupTimer = "cleanup"
def roleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
object OldestChangedBuffer {
/**
* Request to deliver one more event.
@ -292,13 +362,10 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
* Use factory method [[ClusterSingletonManager#props]] to create the
* [[akka.actor.Props]] for the actor.
*
* ==Arguments==
*
* '''''singletonProps''''' [[akka.actor.Props]] of the singleton actor instance.
* @param singletonProps [[akka.actor.Props]] of the singleton actor instance.
*
* '''''singletonName''''' The actor name of the child singleton actor.
*
* '''''terminationMessage''''' When handing over to a new oldest node
* @param terminationMessage When handing over to a new oldest node
* this `terminationMessage` is sent to the singleton actor to tell
* it to finish its work, close resources, and stop.
* The hand-over to the new oldest node is completed when the
@ -306,51 +373,18 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
* Note that [[akka.actor.PoisonPill]] is a perfectly fine
* `terminationMessage` if you only need to stop the actor.
*
* '''''role''''' Singleton among the nodes tagged with specified role.
* If the role is not specified it's a singleton among all nodes in
* the cluster.
*
* '''''maxHandOverRetries''''' When a node is becoming oldest it sends
* hand-over request to previous oldest. This is retried with the
* `retryInterval` until the previous oldest confirms that the hand
* over has started, or this `maxHandOverRetries` limit has been
* reached. If the retry limit is reached it takes the decision to be
* the new oldest if previous oldest is unknown (typically removed),
* otherwise it initiates a new round by throwing
* [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] and expecting
* restart with fresh state. For a cluster with many members you might
* need to increase this retry limit because it takes longer time to
* propagate changes across all nodes.
*
* '''''maxTakeOverRetries''''' When a oldest node is not oldest any more
* it sends take over request to the new oldest to initiate the normal
* hand-over process. This is especially useful when new node joins and becomes
* oldest immediately, without knowing who was previous oldest. This is retried
* with the `retryInterval` until this retry limit has been reached. If the retry
* limit is reached it initiates a new round by throwing
* [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] and expecting
* restart with fresh state. This will also cause the singleton actor to be
* stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to
* ensure that new oldest doesn't start singleton actor before previous is
* stopped for certain corner cases.
* @param settings see [[ClusterSingletonManagerSettings]]
*/
class ClusterSingletonManager(
singletonProps: Props,
singletonName: String,
terminationMessage: Any,
role: Option[String],
maxHandOverRetries: Int,
maxTakeOverRetries: Int,
retryInterval: FiniteDuration)
settings: ClusterSingletonManagerSettings)
extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] {
// to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases
require(maxTakeOverRetries < maxHandOverRetries,
s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]")
import ClusterSingletonManager._
import ClusterSingletonManager.Internal._
import ClusterSingletonManager.Internal.OldestChangedBuffer._
import settings._
val cluster = Cluster(context.system)
val selfAddressOption = Some(cluster.selfAddress)