diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 66773593f4..c3c97b7b0e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -7,7 +7,6 @@ import java.net.URLEncoder import java.util.concurrent.ConcurrentHashMap import akka.cluster.sharding.Shard.{ ShardCommand, StateChange } import akka.cluster.sharding.ShardCoordinator.Internal.SnapshotTick - import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ @@ -39,6 +38,7 @@ import akka.pattern.ask import akka.persistence._ import akka.cluster.ClusterEvent.ClusterDomainEvent import akka.cluster.singleton.ClusterSingletonManager +import akka.cluster.singleton.ClusterSingletonManagerSettings /** * This extension provides sharding functionality of actors in a cluster. @@ -405,11 +405,12 @@ private[akka] class ClusterShardingGuardian extends Actor { val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout, rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy) val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps) + val singletonSettings = ClusterSingletonManagerSettings(context.system) + .withSingletonName("singleton").withRole(role) context.actorOf(ClusterSingletonManager.props( singletonProps, - singletonName = "singleton", terminationMessage = PoisonPill, - role = role), + singletonSettings), name = coordinatorSingletonManagerName) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 03fe92d6f0..98d05a74a5 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -5,7 +5,6 @@ package akka.cluster.sharding import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff } import akka.cluster.sharding.ShardRegion.Passivate - import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -24,6 +23,7 @@ import akka.testkit.TestEvent.Mute import java.io.File import org.apache.commons.io.FileUtils import akka.cluster.singleton.ClusterSingletonManager +import akka.cluster.singleton.ClusterSingletonManagerSettings object ClusterShardingSpec extends MultiNodeConfig { val controller = role("controller") @@ -194,9 +194,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val rebalanceEnabled = coordinatorName.toLowerCase.startsWith("rebalancing") system.actorOf(ClusterSingletonManager.props( singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps(rebalanceEnabled)), - singletonName = "singleton", terminationMessage = PoisonPill, - role = None), + settings = ClusterSingletonManagerSettings(system)), name = coordinatorName + "Coordinator") } } diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 7c87468224..e02e9ffb2f 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -75,3 +75,44 @@ akka.cluster.client { # //#cluster-client-mailbox-config +akka.cluster.singleton { + # The actor name of the child singleton actor. + singleton-name = "singleton" + + # Singleton among the nodes tagged with specified role. + # If the role is not specified it's a singleton among all nodes in the cluster. + role = "" + + # When a node is becoming oldest it sends hand-over request to previous oldest. + # This is retried with the 'retry-interval' until the previous oldest confirms + # that the hand over has started, or this -max-hand-over-retries' limit has been + # reached. If the retry limit is reached it takes the decision to be the new oldest + # if previous oldest is unknown (typically removed), otherwise it initiates a new + # round by throwing 'akka.cluster.singleton.ClusterSingletonManagerIsStuck' and expecting + # restart with fresh state. For a cluster with many members you might need to increase + # this retry limit because it takes longer time to propagate changes across all nodes. + max-hand-over-retries = 10 + + # When a oldest node leaves the cluster it is not oldest any more and then it sends + # take over request to the new oldest to initiate the hand-over process. This is + # retried with the 'retry-interval' until this retry limit has been reached. If the + # retry limit is reached it initiates a new round by throwing + # 'akka.cluster.singleton.ClusterSingletonManagerIsStuck' and expecting restart with + # fresh state. This will also cause the singleton actor to be stopped. + # 'max-take-over-retries` must be less than 'max-hand-over-retries' to ensure that + # new oldest doesn't start singleton actor before previous is stopped for certain + # corner cases. + max-take-over-retries = 5 + + # Interval for hand over and take over messages + retry-interval = 1s +} + +akka.cluster.singleton-proxy { + # The role of the cluster nodes where the singleton can be deployed. + # If the role is not specified then any node will do. + role = "" + + # Interval at which the proxy will try to resolve the singleton instance. + singleton-identification-interval = 1s +} \ No newline at end of file diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 439031a249..e1a4bddc9c 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -4,12 +4,14 @@ package akka.cluster.singleton +import com.typesafe.config.Config import scala.concurrent.duration._ import scala.collection.immutable import akka.actor.Actor import akka.actor.Actor.Receive import akka.actor.Deploy import akka.actor.ActorLogging +import akka.actor.ActorSystem import akka.actor.ActorRef import akka.actor.ActorSelection import akka.actor.Address @@ -21,6 +23,109 @@ import akka.cluster.ClusterEvent._ import akka.cluster.Member import akka.cluster.MemberStatus import akka.AkkaException +import akka.actor.NoSerializationVerificationNeeded + +object ClusterSingletonManagerSettings { + + /** + * Create settings from the default configuration + * `akka.cluster.singleton`. + */ + def apply(system: ActorSystem): ClusterSingletonManagerSettings = + apply(system.settings.config.getConfig("akka.cluster.singleton")) + + /** + * Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.singleton`. + */ + def apply(config: Config): ClusterSingletonManagerSettings = + new ClusterSingletonManagerSettings( + singletonName = config.getString("singleton-name"), + role = roleOption(config.getString("role")), + maxHandOverRetries = config.getInt("max-hand-over-retries"), + maxTakeOverRetries = config.getInt("max-take-over-retries"), + retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis) + + /** + * Java API: Create settings from the default configuration + * `akka.cluster.singleton`. + */ + def create(system: ActorSystem): ClusterSingletonManagerSettings = apply(system) + + /** + * Java API: Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.singleton`. + */ + def create(config: Config): ClusterSingletonManagerSettings = apply(config) + + /** + * INTERNAL API + */ + private[akka] def roleOption(role: String): Option[String] = + if (role == "") None else Option(role) + +} + +/** + * @param singletonName The actor name of the child singleton actor. + * + * @param role Singleton among the nodes tagged with specified role. + * If the role is not specified it's a singleton among all nodes in + * the cluster. + * + * @param maxHandOverRetries When a node is becoming oldest it sends + * hand-over request to previous oldest. This is retried with the + * `retryInterval` until the previous oldest confirms that the hand + * over has started, or this `maxHandOverRetries` limit has been + * reached. If the retry limit is reached it takes the decision to be + * the new oldest if previous oldest is unknown (typically removed), + * otherwise it initiates a new round by throwing + * [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] and expecting + * restart with fresh state. For a cluster with many members you might + * need to increase this retry limit because it takes longer time to + * propagate changes across all nodes. + * + * @param maxTakeOverRetries When a oldest node leaves the cluster it is + * not oldest any more and then it sends take over request to the new oldest to + * initiate the hand-over process. This is retried with the `retryInterval` until + * this retry limit has been reached. If the retry limit is reached it initiates + * a new round by throwing [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] + * and expecting restart with fresh state. This will also cause the singleton actor + * to be stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to + * ensure that new oldest doesn't start singleton actor before previous is + * stopped for certain corner cases. + * + * @param retryInterval Interval for hand over and take over messages + */ +final class ClusterSingletonManagerSettings( + val singletonName: String, + val role: Option[String], + val maxHandOverRetries: Int, + val maxTakeOverRetries: Int, + val retryInterval: FiniteDuration) extends NoSerializationVerificationNeeded { + + // to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases + require(maxTakeOverRetries < maxHandOverRetries, + s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]") + + def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name) + + def withRole(role: String): ClusterSingletonManagerSettings = copy(role = ClusterSingletonManagerSettings.roleOption(role)) + + def withRole(role: Option[String]) = copy(role = role) + + def withRetry(maxHandOverRetries: Int, maxTakeOverRetries: Int, retryInterval: FiniteDuration): ClusterSingletonManagerSettings = + copy(maxHandOverRetries = maxHandOverRetries, + maxTakeOverRetries = maxTakeOverRetries, + retryInterval = retryInterval) + + private def copy(singletonName: String = singletonName, + role: Option[String] = role, + maxHandOverRetries: Int = maxHandOverRetries, + maxTakeOverRetries: Int = maxTakeOverRetries, + retryInterval: FiniteDuration = retryInterval): ClusterSingletonManagerSettings = + new ClusterSingletonManagerSettings(singletonName, role, maxHandOverRetries, maxTakeOverRetries, retryInterval) +} object ClusterSingletonManager { @@ -29,39 +134,9 @@ object ClusterSingletonManager { */ def props( singletonProps: Props, - singletonName: String, terminationMessage: Any, - role: Option[String], - maxHandOverRetries: Int = 10, - maxTakeOverRetries: Int = 5, - retryInterval: FiniteDuration = 1.second): Props = - Props(classOf[ClusterSingletonManager], singletonProps, singletonName, terminationMessage, role, - maxHandOverRetries, maxTakeOverRetries, retryInterval).withDeploy(Deploy.local) - - /** - * Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]]. - */ - def props( - singletonProps: Props, - singletonName: String, - terminationMessage: Any, - role: String, - maxHandOverRetries: Int, - maxTakeOverRetries: Int, - retryInterval: FiniteDuration): Props = - props(singletonProps, singletonName, terminationMessage, - ClusterSingletonManager.Internal.roleOption(role), maxHandOverRetries, maxTakeOverRetries, retryInterval) - - /** - * Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]] - * with default values. - */ - def defaultProps( - singletonProps: Props, - singletonName: String, - terminationMessage: Any, - role: String): Props = - props(singletonProps, singletonName, terminationMessage, ClusterSingletonManager.Internal.roleOption(role)) + settings: ClusterSingletonManagerSettings): Props = + Props(new ClusterSingletonManager(singletonProps, terminationMessage, settings)).withDeploy(Deploy.local) /** * INTERNAL API @@ -132,11 +207,6 @@ object ClusterSingletonManager { val TakeOverRetryTimer = "take-over-retry" val CleanupTimer = "cleanup" - def roleOption(role: String): Option[String] = role match { - case null | "" ⇒ None - case _ ⇒ Some(role) - } - object OldestChangedBuffer { /** * Request to deliver one more event. @@ -292,13 +362,10 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * Use factory method [[ClusterSingletonManager#props]] to create the * [[akka.actor.Props]] for the actor. * - * ==Arguments== * - * '''''singletonProps''''' [[akka.actor.Props]] of the singleton actor instance. + * @param singletonProps [[akka.actor.Props]] of the singleton actor instance. * - * '''''singletonName''''' The actor name of the child singleton actor. - * - * '''''terminationMessage''''' When handing over to a new oldest node + * @param terminationMessage When handing over to a new oldest node * this `terminationMessage` is sent to the singleton actor to tell * it to finish its work, close resources, and stop. * The hand-over to the new oldest node is completed when the @@ -306,51 +373,18 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * Note that [[akka.actor.PoisonPill]] is a perfectly fine * `terminationMessage` if you only need to stop the actor. * - * '''''role''''' Singleton among the nodes tagged with specified role. - * If the role is not specified it's a singleton among all nodes in - * the cluster. - * - * '''''maxHandOverRetries''''' When a node is becoming oldest it sends - * hand-over request to previous oldest. This is retried with the - * `retryInterval` until the previous oldest confirms that the hand - * over has started, or this `maxHandOverRetries` limit has been - * reached. If the retry limit is reached it takes the decision to be - * the new oldest if previous oldest is unknown (typically removed), - * otherwise it initiates a new round by throwing - * [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] and expecting - * restart with fresh state. For a cluster with many members you might - * need to increase this retry limit because it takes longer time to - * propagate changes across all nodes. - * - * '''''maxTakeOverRetries''''' When a oldest node is not oldest any more - * it sends take over request to the new oldest to initiate the normal - * hand-over process. This is especially useful when new node joins and becomes - * oldest immediately, without knowing who was previous oldest. This is retried - * with the `retryInterval` until this retry limit has been reached. If the retry - * limit is reached it initiates a new round by throwing - * [[akka.cluster.singleton.ClusterSingletonManagerIsStuck]] and expecting - * restart with fresh state. This will also cause the singleton actor to be - * stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to - * ensure that new oldest doesn't start singleton actor before previous is - * stopped for certain corner cases. + * @param settings see [[ClusterSingletonManagerSettings]] */ class ClusterSingletonManager( singletonProps: Props, - singletonName: String, terminationMessage: Any, - role: Option[String], - maxHandOverRetries: Int, - maxTakeOverRetries: Int, - retryInterval: FiniteDuration) + settings: ClusterSingletonManagerSettings) extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] { - // to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases - require(maxTakeOverRetries < maxHandOverRetries, - s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]") - import ClusterSingletonManager._ import ClusterSingletonManager.Internal._ import ClusterSingletonManager.Internal.OldestChangedBuffer._ + import settings._ val cluster = Cluster(context.system) val selfAddressOption = Some(cluster.selfAddress) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index f6f37d424b..0a5564ea77 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -15,43 +15,76 @@ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberExited import scala.concurrent.duration._ import scala.language.postfixOps +import com.typesafe.config.Config +import akka.actor.NoSerializationVerificationNeeded + +object ClusterSingletonProxySettings { + + /** + * Create settings from the default configuration + * `akka.cluster.singleton-proxy`. + */ + def apply(system: ActorSystem): ClusterSingletonProxySettings = + apply(system.settings.config.getConfig("akka.cluster.singleton-proxy")) + + /** + * Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.singleton-proxy`. + */ + def apply(config: Config): ClusterSingletonProxySettings = + new ClusterSingletonProxySettings( + role = roleOption(config.getString("role")), + singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis) + + /** + * Java API: Create settings from the default configuration + * `akka.cluster.singleton-proxy`. + */ + def create(system: ActorSystem): ClusterSingletonProxySettings = apply(system) + + /** + * Java API: Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.singleton-proxy`. + */ + def create(config: Config): ClusterSingletonProxySettings = apply(config) + + /** + * INTERNAL API + */ + private[akka] def roleOption(role: String): Option[String] = + if (role == "") None else Option(role) + +} + +/** + * @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do. + * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. + */ +final class ClusterSingletonProxySettings( + val role: Option[String], + val singletonIdentificationInterval: FiniteDuration) extends NoSerializationVerificationNeeded { + + def withRole(role: String): ClusterSingletonProxySettings = copy(role = ClusterSingletonProxySettings.roleOption(role)) + + def withRole(role: Option[String]): ClusterSingletonProxySettings = copy(role = role) + + def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings = + copy(singletonIdentificationInterval = singletonIdentificationInterval) + + private def copy(role: Option[String] = role, + singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval): ClusterSingletonProxySettings = + new ClusterSingletonProxySettings(role, singletonIdentificationInterval) +} object ClusterSingletonProxy { /** * Scala API: Factory method for `ClusterSingletonProxy` [[akka.actor.Props]]. * * @param singletonPath The logical path of the singleton, i.e., /user/singletonManager/singleton. - * @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do. - * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. - * @return The singleton proxy Props. + * @param settings see [[ClusterSingletonProxySettings]] */ - def props(singletonPath: String, role: Option[String], singletonIdentificationInterval: FiniteDuration = 1.second): Props = Props(classOf[ClusterSingletonProxy], singletonPath, role, singletonIdentificationInterval) - - /** - * Java API: Factory method for `ClusterSingletonProxy` [[akka.actor.Props]]. - * - * @param singletonPath The logical path of the singleton, i.e., /user/singletonManager/singleton. - * @param role The role of the cluster nodes where the singleton can be deployed. If null, then any node will do. - * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. - * @return The singleton proxy Props. - */ - def props(singletonPath: String, role: String, singletonIdentificationInterval: FiniteDuration): Props = - props(singletonPath, roleOption(role), singletonIdentificationInterval) - - /** - * Java API: Factory method for `ClusterSingletonProxy` [[akka.actor.Props]]. The interval at which the proxy will try - * to resolve the singleton instance is set to 1 second. - * - * @param singletonPath The logical path of the singleton, i.e., /user/singletonManager/singleton. - * @param role The role of the cluster nodes where the singleton can be deployed. If null, then any node will do. - * @return The singleton proxy Props. - */ - def defaultProps(singletonPath: String, role: String): Props = props(singletonPath, role, 1 second) - - private def roleOption(role: String): Option[String] = role match { - case null | "" ⇒ None - case _ ⇒ Some(role) - } + def props(singletonPath: String, settings: ClusterSingletonProxySettings): Props = + Props(new ClusterSingletonProxy(singletonPath, settings)).withDeploy(Deploy.local) private case object TryToIdentifySingleton @@ -74,16 +107,9 @@ object ClusterSingletonProxy { * * Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the * actors involved. - * - * @param singletonPathString The logical path of the singleton. This does not include the node address or actor system - * name, e.g., it can be something like /user/singletonManager/singleton. - * @param role Cluster role on which the singleton is deployed. This is required to keep track only of the members where - * the singleton can actually exist. - * @param singletonIdentificationInterval Periodicity at which the proxy sends the `Identify` message to the current - * singleton actor selection. */ -class ClusterSingletonProxy(singletonPathString: String, role: Option[String], singletonIdentificationInterval: FiniteDuration) extends Actor with Stash with ActorLogging { - +class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with Stash with ActorLogging { + import settings._ val singletonPath = singletonPathString.split("/") var identifyCounter = 0 var identifyId = createIdentifyId(identifyCounter) diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerChaosSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerChaosSpec.scala index 5da1515bac..0c166b9ea3 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerChaosSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerChaosSpec.scala @@ -79,9 +79,8 @@ class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonMan def createSingleton(): ActorRef = { system.actorOf(ClusterSingletonManager.props( singletonProps = Props(classOf[Echo], testActor), - singletonName = "echo", terminationMessage = PoisonPill, - role = None), + settings = ClusterSingletonManagerSettings(system).withSingletonName("echo")), name = "singleton") } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala index 6312ea0604..b3b03ca145 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala @@ -77,16 +77,15 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan def createSingleton(): ActorRef = { system.actorOf(ClusterSingletonManager.props( singletonProps = Props(classOf[Echo], testActor), - singletonName = "echo", terminationMessage = PoisonPill, - role = None), + settings = ClusterSingletonManagerSettings(system).withSingletonName("echo")), name = "singleton") } lazy val echoProxy: ActorRef = { system.actorOf(ClusterSingletonProxy.props( singletonPath = "/user/singleton/echo", - role = None), + settings = ClusterSingletonProxySettings(system)), name = "echoProxy") } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala index 66b7a29270..4178c5ef03 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala @@ -259,9 +259,9 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS //#create-singleton-manager system.actorOf(ClusterSingletonManager.props( singletonProps = Props(classOf[Consumer], queue, testActor), - singletonName = "consumer", terminationMessage = End, - role = Some("worker")), + settings = ClusterSingletonManagerSettings(system) + .withSingletonName("consumer").withRole("worker")), name = "singleton") //#create-singleton-manager } @@ -270,7 +270,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS //#create-singleton-proxy system.actorOf(ClusterSingletonProxy.props( singletonPath = "/user/singleton/consumer", - role = Some("worker")), + settings = ClusterSingletonProxySettings(system).withRole("worker")), name = "consumerProxy") //#create-singleton-proxy } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerStartupSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerStartupSpec.scala index 114cbaac79..d7c442bb09 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerStartupSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerStartupSpec.scala @@ -71,16 +71,15 @@ class ClusterSingletonManagerStartupSpec extends MultiNodeSpec(ClusterSingletonM def createSingleton(): ActorRef = { system.actorOf(ClusterSingletonManager.props( singletonProps = Props(classOf[Echo], testActor), - singletonName = "echo", terminationMessage = PoisonPill, - role = None), + settings = ClusterSingletonManagerSettings(system).withSingletonName("echo")), name = "singleton") } lazy val echoProxy: ActorRef = { system.actorOf(ClusterSingletonProxy.props( singletonPath = "/user/singleton/echo", - role = None), + settings = ClusterSingletonProxySettings(system)), name = "echoProxy") } diff --git a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java index 654ce00ee5..45ac45aa02 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java @@ -31,12 +31,16 @@ public class ClusterSingletonManagerTest { final ActorRef testActor = null; //#create-singleton-manager - system.actorOf(ClusterSingletonManager.defaultProps(Props.create(Consumer.class, queue, testActor), "consumer", - new End(), "worker"), "singleton"); + final ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(system) + .withSingletonName("consumer").withRole("worker"); + system.actorOf(ClusterSingletonManager.props(Props.create(Consumer.class, queue, testActor), + new End(), settings), "singleton"); //#create-singleton-manager //#create-singleton-proxy - system.actorOf(ClusterSingletonProxy.defaultProps("user/singleton/consumer", "worker"), "consumerProxy"); + ClusterSingletonProxySettings proxySettings = + ClusterSingletonProxySettings.create(system).withRole("worker"); + system.actorOf(ClusterSingletonProxy.props("user/singleton/consumer", proxySettings), "consumerProxy"); //#create-singleton-proxy } diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonProxySpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonProxySpec.scala index bbbd3cea10..659f2b55de 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonProxySpec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonProxySpec.scala @@ -40,14 +40,14 @@ object ClusterSingletonProxySpec { cluster.registerOnMemberUp { system.actorOf(ClusterSingletonManager.props( singletonProps = Props[Singleton], - singletonName = "singleton", terminationMessage = PoisonPill, - role = None, - maxHandOverRetries = 5, - maxTakeOverRetries = 2), name = "singletonManager") + settings = ClusterSingletonManagerSettings(system) + .withRetry(maxHandOverRetries = 5, maxTakeOverRetries = 2, retryInterval = 1.second)), + name = "singletonManager") } - val proxy = system.actorOf(ClusterSingletonProxy.props("user/singletonManager/singleton", None), s"singletonProxy-${cluster.selfAddress.port.getOrElse(0)}") + val proxy = system.actorOf(ClusterSingletonProxy.props("user/singletonManager/singleton", + settings = ClusterSingletonProxySettings(system)), s"singletonProxy-${cluster.selfAddress.port.getOrElse(0)}") def testProxy(msg: String) { val probe = TestProbe() diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index f069abc62d..e9690f3469 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -217,3 +217,11 @@ named ``akka-cluster-sharding``. You need to replace this dependency if you use The classes changed package name from ``akka.contrib.pattern`` to ``akka.cluster.sharding``. The configuration properties changed name to ``akka.cluster.sharding``. + +ClusterSingletonManager and ClusterSingletonProxy construction +============================================================== + +Parameters to the ``Props`` factory methods have been moved to settings object ``ClusterSingletonManagerSettings` +and ``ClusterSingletonProxySettings``. These can be created from system configuration properties and also +amended with API as needed. + diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java index 8ae86a89d0..c620e45df6 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java @@ -7,7 +7,9 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; import akka.cluster.singleton.ClusterSingletonManager; +import akka.cluster.singleton.ClusterSingletonManagerSettings; import akka.cluster.singleton.ClusterSingletonProxy; +import akka.cluster.singleton.ClusterSingletonProxySettings; public class StatsSampleOneMasterMain { @@ -32,14 +34,18 @@ public class StatsSampleOneMasterMain { ActorSystem system = ActorSystem.create("ClusterSystem", config); //#create-singleton-manager - system.actorOf(ClusterSingletonManager.defaultProps( - Props.create(StatsService.class), "statsService", - PoisonPill.getInstance(), "compute"), "singleton"); + ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(system) + .withSingletonName("statsService").withRole("compute"); + system.actorOf(ClusterSingletonManager.props( + Props.create(StatsService.class), PoisonPill.getInstance(), settings), + "singleton"); //#create-singleton-manager //#singleton-proxy - system.actorOf(ClusterSingletonProxy.defaultProps("/user/singleton/statsService", - "compute"), "statsServiceProxy"); + ClusterSingletonProxySettings proxySettings = + ClusterSingletonProxySettings.create(system).withRole("compute"); + system.actorOf(ClusterSingletonProxy.props("/user/singleton/statsService", + proxySettings), "statsServiceProxy"); //#singleton-proxy } diff --git a/akka-samples/akka-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index 9e8e0e5cf1..89c6b1ec13 100644 --- a/akka-samples/akka-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster-java/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -15,11 +15,13 @@ import akka.cluster.MemberStatus import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberUp import akka.cluster.singleton.ClusterSingletonManager +import akka.cluster.singleton.ClusterSingletonManagerSettings import akka.cluster.singleton.ClusterSingletonProxy import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit.ImplicitSender import sample.cluster.stats.StatsMessages._ +import akka.cluster.singleton.ClusterSingletonProxySettings object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig { // register the named roles (nodes) of the test @@ -100,14 +102,14 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing Cluster(system).unsubscribe(testActor) - system.actorOf(ClusterSingletonManager.defaultProps( + system.actorOf(ClusterSingletonManager.props( Props[StatsService], - singletonName = "statsService", terminationMessage = PoisonPill, - role = null), name = "singleton") + settings = ClusterSingletonManagerSettings(system).withSingletonName("statsService")), + name = "singleton") - system.actorOf(ClusterSingletonProxy.defaultProps("/user/singleton/statsService", - "compute"), "statsServiceProxy"); + system.actorOf(ClusterSingletonProxy.props("/user/singleton/statsService", + ClusterSingletonProxySettings(system).withRole("compute")), "statsServiceProxy") testConductor.enter("all-up") } @@ -126,4 +128,4 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing } } -} \ No newline at end of file +} diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala index 49809e3eca..c6f28788a3 100644 --- a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala +++ b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala @@ -5,7 +5,9 @@ import akka.actor.ActorSystem import akka.actor.PoisonPill import akka.actor.Props import akka.cluster.singleton.ClusterSingletonManager +import akka.cluster.singleton.ClusterSingletonManagerSettings import akka.cluster.singleton.ClusterSingletonProxy +import akka.cluster.singleton.ClusterSingletonProxySettings object StatsSampleOneMaster { def main(args: Array[String]): Unit = { @@ -29,14 +31,17 @@ object StatsSampleOneMaster { //#create-singleton-manager system.actorOf(ClusterSingletonManager.props( - singletonProps = Props[StatsService], singletonName = "statsService", - terminationMessage = PoisonPill, role = Some("compute")), + singletonProps = Props[StatsService], + terminationMessage = PoisonPill, + settings = ClusterSingletonManagerSettings(system) + .withSingletonName("statsService").withRole("compute")), name = "singleton") //#create-singleton-manager //#singleton-proxy system.actorOf(ClusterSingletonProxy.props(singletonPath = "/user/singleton/statsService", - role = Some("compute")), name = "statsServiceProxy") + settings = ClusterSingletonProxySettings(system).withRole("compute")), + name = "statsServiceProxy") //#singleton-proxy } } diff --git a/akka-samples/akka-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index cb599ac59f..9b31858deb 100644 --- a/akka-samples/akka-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster-scala/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -10,6 +10,7 @@ import akka.actor.PoisonPill import akka.actor.Props import akka.actor.RootActorPath import akka.cluster.singleton.ClusterSingletonManager +import akka.cluster.singleton.ClusterSingletonManagerSettings import akka.cluster.singleton.ClusterSingletonProxy import akka.cluster.Cluster import akka.cluster.Member @@ -19,6 +20,7 @@ import akka.cluster.ClusterEvent.MemberUp import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit.ImplicitSender +import akka.cluster.singleton.ClusterSingletonProxySettings object StatsSampleSingleMasterSpecConfig extends MultiNodeConfig { // register the named roles (nodes) of the test @@ -100,11 +102,14 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing Cluster(system).unsubscribe(testActor) system.actorOf(ClusterSingletonManager.props( - singletonProps = Props[StatsService], singletonName = "statsService", - terminationMessage = PoisonPill, role = Some("compute")), name = "singleton") + singletonProps = Props[StatsService], terminationMessage = PoisonPill, + settings = ClusterSingletonManagerSettings(system) + .withSingletonName("statsService").withRole("compute")), + name = "singleton") system.actorOf(ClusterSingletonProxy.props(singletonPath = "/user/singleton/statsService", - role = Some("compute")), name = "statsServiceProxy") + ClusterSingletonProxySettings(system).withRole("compute")), + name = "statsServiceProxy") testConductor.enter("all-up") } @@ -124,4 +129,4 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing } } -} \ No newline at end of file +}