diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 9002f3b044..e38ea1c3d4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -5,31 +5,20 @@ package akka.actor import akka.testkit.AkkaSpec -import DeploymentConfig._ import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions +import akka.routing._ object DeployerSpec { val deployerConf = ConfigFactory.parseString(""" akka.actor.deployment { /user/service1 { } - /user/service2 { - router = round-robin - nr-of-instances = 3 - remote { - nodes = ["wallace:2552", "gromit:2552"] - } - } /user/service3 { create-as { class = "akka.actor.DeployerSpec$RecipeActor" } } - /user/service-auto { - router = round-robin - nr-of-instances = auto - } /user/service-direct { router = direct } @@ -47,31 +36,6 @@ object DeployerSpec { /user/service-scatter-gather { router = scatter-gather } - /user/service-least-cpu { - router = least-cpu - } - /user/service-least-ram { - router = least-ram - } - /user/service-least-messages { - router = least-messages - } - /user/service-custom { - router = org.my.Custom - } - /user/service-cluster1 { - cluster { - preferred-nodes = ["node:wallace", "node:gromit"] - } - } - /user/service-cluster2 { - cluster { - preferred-nodes = ["node:wallace", "node:gromit"] - replication { - strategy = write-behind - } - } - } } """, ConfigParseOptions.defaults) @@ -94,9 +58,9 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { deployment must be(Some( Deploy( service, + deployment.get.config, None, - NoRouting, - NrOfInstances(1), + NoRouter, LocalScope))) } @@ -114,28 +78,14 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { deployment must be(Some( Deploy( service, + deployment.get.config, Some(ActorRecipe(classOf[DeployerSpec.RecipeActor])), - NoRouting, - NrOfInstances(1), - LocalScope))) - } - - "be able to parse 'akka.actor.deployment._' with number-of-instances=auto" in { - val service = "/user/service-auto" - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) - deployment must be('defined) - - deployment must be(Some( - Deploy( - service, - None, - RoundRobin, - AutoNrOfInstances, + NoRouter, LocalScope))) } "detect invalid number-of-instances" in { - intercept[akka.config.ConfigurationException] { + intercept[com.typesafe.config.ConfigException.WrongType] { val invalidDeployerConf = ConfigFactory.parseString(""" akka.actor.deployment { /user/service-invalid-number-of-instances { @@ -150,50 +100,35 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { } "be able to parse 'akka.actor.deployment._' with direct router" in { - assertRouting(NoRouting, "/user/service-direct") + assertRouting(NoRouter, "/user/service-direct") } "ignore nr-of-instances with direct router" in { - assertRouting(NoRouting, "/user/service-direct2") + assertRouting(NoRouter, "/user/service-direct2") } "be able to parse 'akka.actor.deployment._' with round-robin router" in { - assertRouting(RoundRobin, "/user/service-round-robin") + assertRouting(RoundRobinRouter(1), "/user/service-round-robin") } "be able to parse 'akka.actor.deployment._' with random router" in { - assertRouting(Random, "/user/service-random") + assertRouting(RandomRouter(1), "/user/service-random") } "be able to parse 'akka.actor.deployment._' with scatter-gather router" in { - assertRouting(ScatterGather, "/user/service-scatter-gather") + assertRouting(ScatterGatherFirstCompletedRouter(1), "/user/service-scatter-gather") } - "be able to parse 'akka.actor.deployment._' with least-cpu router" in { - assertRouting(LeastCPU, "/user/service-least-cpu") - } - - "be able to parse 'akka.actor.deployment._' with least-ram router" in { - assertRouting(LeastRAM, "/user/service-least-ram") - } - - "be able to parse 'akka.actor.deployment._' with least-messages router" in { - assertRouting(LeastMessages, "/user/service-least-messages") - } - "be able to parse 'akka.actor.deployment._' with custom router" in { - assertRouting(CustomRouter("org.my.Custom"), "/user/service-custom") - } - - def assertRouting(expected: Routing, service: String) { + def assertRouting(expected: RouterConfig, service: String) { val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be('defined) deployment must be(Some( Deploy( service, + deployment.get.config, None, expected, - NrOfInstances(1), LocalScope))) } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index a62f6712f0..960dd06ae3 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -4,29 +4,31 @@ import akka.actor._ import akka.routing._ import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ CountDownLatch, TimeUnit } -import akka.testkit.AkkaSpec -import akka.actor.DeploymentConfig._ -import akka.routing.Routing.Broadcast -import akka.testkit.DefaultTimeout +import akka.testkit._ +import akka.util.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { +class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val deployer = system.asInstanceOf[ActorSystemImpl].provider.deployer + "RouterConfig" must { + + "be overridable in config" in { + deployer.deploy(Deploy("/config", null, None, RandomRouter(4), LocalScope)) + val actor = system.actorOf(Props(new Actor { + def receive = { + case "get" ⇒ sender ! context.props + } + }).withRouting(RoundRobinRouter(12)), "config") + actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4) + } + + } + "round robin router" must { "be able to shut down its instance" in { - val path = system / "round-robin-0" - - deployer.deploy( - Deploy( - path.toString, - None, - RoundRobin, - NrOfInstances(5), - LocalScope)) - val helloLatch = new CountDownLatch(5) val stopLatch = new CountDownLatch(5) @@ -38,7 +40,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { override def postStop() { stopLatch.countDown() } - }), path.name) + }).withRouting(RoundRobinRouter(5)), "round-robin-shutdown") actor ! "hello" actor ! "hello" @@ -52,16 +54,6 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { } "deliver messages in a round robin fashion" in { - val path = system / "round-robin-1" - - deployer.deploy( - Deploy( - path.toString, - None, - RoundRobin, - NrOfInstances(10), - LocalScope)) - val connectionCount = 10 val iterationCount = 10 val doneLatch = new CountDownLatch(connectionCount) @@ -69,7 +61,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { val counter = new AtomicInteger var replies = Map.empty[Int, Int] for (i ← 0 until connectionCount) { - replies = replies + (i -> 0) + replies += i -> 0 } val actor = system.actorOf(Props(new Actor { @@ -78,7 +70,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { case "hit" ⇒ sender ! id case "end" ⇒ doneLatch.countDown() } - }), path.name) + }).withRouting(RoundRobinRouter(connectionCount)), "round-robin") for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { @@ -92,20 +84,10 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { actor ! Broadcast("end") doneLatch.await(5, TimeUnit.SECONDS) must be(true) - replies.values foreach { _ must be(10) } + replies.values foreach { _ must be(iterationCount) } } "deliver a broadcast message using the !" in { - val path = system / "round-robin-2" - - deployer.deploy( - Deploy( - path.toString, - None, - RoundRobin, - NrOfInstances(5), - LocalScope)) - val helloLatch = new CountDownLatch(5) val stopLatch = new CountDownLatch(5) @@ -117,7 +99,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { override def postStop() { stopLatch.countDown() } - }), path.name) + }).withRouting(RoundRobinRouter(5)), "round-robin-broadcast") actor ! Broadcast("hello") helloLatch.await(5, TimeUnit.SECONDS) must be(true) @@ -130,27 +112,17 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { "random router" must { "be able to shut down its instance" in { - val path = system / "random-0" - - deployer.deploy( - Deploy( - path.toString, - None, - Random, - NrOfInstances(7), - LocalScope)) - val stopLatch = new CountDownLatch(7) val actor = system.actorOf(Props(new Actor { def receive = { - case "hello" ⇒ {} + case "hello" ⇒ sender ! "world" } override def postStop() { stopLatch.countDown() } - }), path.name) + }).withRouting(RandomRouter(7)), "random-shutdown") actor ! "hello" actor ! "hello" @@ -158,21 +130,15 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { actor ! "hello" actor ! "hello" + within(2 seconds) { + for (i ← 1 to 5) expectMsg("world") + } + actor.stop() stopLatch.await(5, TimeUnit.SECONDS) must be(true) } "deliver messages in a random fashion" in { - val path = system / "random-1" - - deployer.deploy( - Deploy( - path.toString, - None, - Random, - NrOfInstances(10), - LocalScope)) - val connectionCount = 10 val iterationCount = 10 val doneLatch = new CountDownLatch(connectionCount) @@ -189,7 +155,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { case "hit" ⇒ sender ! id case "end" ⇒ doneLatch.countDown() } - }), path.name) + }).withRouting(RandomRouter(connectionCount)), "random") for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { @@ -204,19 +170,10 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { doneLatch.await(5, TimeUnit.SECONDS) must be(true) replies.values foreach { _ must be > (0) } + replies.values.sum must be === iterationCount * connectionCount } "deliver a broadcast message using the !" in { - val path = system / "random-2" - - deployer.deploy( - Deploy( - path.toString, - None, - Random, - NrOfInstances(6), - LocalScope)) - val helloLatch = new CountDownLatch(6) val stopLatch = new CountDownLatch(6) @@ -228,7 +185,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { override def postStop() { stopLatch.countDown() } - }), path.name) + }).withRouting(RandomRouter(6)), "random-broadcast") actor ! Broadcast("hello") helloLatch.await(5, TimeUnit.SECONDS) must be(true) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 751092827e..0bb70b3ffc 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -3,7 +3,6 @@ package akka.routing import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import collection.mutable.LinkedList -import akka.routing.Routing.Broadcast import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit._ diff --git a/akka-actor/src/main/java/akka/routing/RouterFactory.java b/akka-actor/src/main/java/akka/routing/RouterFactory.java deleted file mode 100644 index 65ce7a10c6..0000000000 --- a/akka-actor/src/main/java/akka/routing/RouterFactory.java +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.routing; - -/** - * A Factory responsible for creating {@link Router} instances. It makes Java compatability possible for users that - * want to provide their own router instance. - */ -public interface RouterFactory { - - /** - * Creates a new Router instance. - * - * @return the newly created Router instance. - */ - Router newRouter(); -} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 0fd4f77a41..4c58a85b23 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -46,59 +46,32 @@ akka { deployment { - default { # deployment id pattern, e.g. /app/service-ping + default { # deployment id pattern, e.g. /app/service-ping - router = "direct" # routing (load-balance) scheme to use - # available: "direct", "round-robin", "random", "scatter-gather" - # "least-cpu", "least-ram", "least-messages" - # or: fully qualified class name of the router class - # default is "direct"; - # if 'replication' is used then the only available router is "direct" + router = "direct" # routing (load-balance) scheme to use + # available: "direct", "round-robin", "random", "scatter-gather" + # or: fully qualified class name of the router class + # default is "direct"; + # In case of non-direct routing, the actors to be routed to can be specified + # in several ways: + # - nr-of-instances: will create that many children given the actor factory + # supplied in the source code (overridable using create-as below) + # - target.paths: will look the paths up using actorFor and route to + # them, i.e. will not create children - nr-of-instances = 1 # number of actor instances in the cluster - # available: positive integer (1-N) or the string "auto" for auto-scaling - # default is '1' - # if the "direct" router is used then this element is ignored (always '1') + nr-of-instances = 1 # number of children to create in case of a non-direct router; this setting + # is ignored if target.paths is given - - # optional - create-as { # FIXME document 'create-as' - class = "" # fully qualified class name of recipe implementation + create-as { # FIXME document 'create-as' + class = "" # fully qualified class name of recipe implementation } - remote = "" # if this is set to a valid remote address, the named actor will be deployed at that node - target { - nodes = [] # A list of hostnames and ports for instantiating the remote actor instances - # The format should be on "hostname:port", where: - # - hostname can be either hostname or IP address the remote actor should connect to - # - port should be the port for the remote server on the other node - paths = [] # Alternatively you can specify the full paths of those actors which should be routed to + paths = [] # Alternatively to giving nr-of-instances you can specify the full paths of + # those actors which should be routed to. This setting takes precedence over + # nr-of-instances } - cluster { # defines the actor as a clustered actor - # default (if omitted) is local non-clustered actor - - preferred-nodes = [] # a list of preferred nodes for instantiating the actor instances on - # on format "host:", "ip:" or "node:" - - - # optional - replication { # use replication or not? only makes sense for a stateful actor - # serialize-mailbox not implemented, ticket #1412 - serialize-mailbox = off # should the actor mailbox be part of the serialized snapshot? - # default is 'off' - - storage = "transaction-log" # storage model for replication - # available: "transaction-log" and "data-grid" - # default is "transaction-log" - - strategy = "write-through" # guarantees for replication - # available: "write-through" and "write-behind" - # default is "write-through" - - } - } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b7a08e9946..caa813be3f 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -4,7 +4,6 @@ package akka.actor -import DeploymentConfig._ import akka.dispatch._ import akka.routing._ import akka.util.Duration diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 133f9e98bc..949f9e05a3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -520,13 +520,13 @@ class LocalActorRefProvider( def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean): InternalActorRef = { props.routerConfig match { - case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor - case routedActor ⇒ new RoutedActorRef(system, props.withRouting(adaptFromDeploy(routedActor, path)), supervisor, path) + case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor + case router ⇒ new RoutedActorRef(system, props.withRouting(adaptFromDeploy(router, path)), supervisor, path) } } private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = { - val lookupPath = p.elements.mkString("/", "/", "") + val lookupPath = p.elements.drop(1).mkString("/", "/", "") r.adaptFromDeploy(deployer.lookup(lookupPath)) } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index dc8adf5a08..6939ebf244 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -7,12 +7,20 @@ package akka.actor import collection.immutable.Seq import java.util.concurrent.ConcurrentHashMap import akka.event.Logging -import akka.actor.DeploymentConfig._ import akka.AkkaException import akka.config.ConfigurationException import akka.util.Duration import akka.event.EventStream import com.typesafe.config._ +import akka.routing._ + +case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope) + +case class ActorRecipe(implementationClass: Class[_ <: Actor]) //TODO Add ActorConfiguration here + +trait Scope +case class LocalScope() extends Scope +case object LocalScope extends Scope /** * Deployer maps actor paths to actor deployments. @@ -40,48 +48,20 @@ class Deployer(val settings: ActorSystem.Settings) { import akka.util.ReflectiveAccess.getClassFor val deployment = config.withFallback(default) - // -------------------------------- - // akka.actor.deployment..router - // -------------------------------- - val router: Routing = deployment.getString("router") match { - case "round-robin" ⇒ RoundRobin - case "random" ⇒ Random - case "scatter-gather" ⇒ ScatterGather - case "least-cpu" ⇒ LeastCPU - case "least-ram" ⇒ LeastRAM - case "least-messages" ⇒ LeastMessages - case routerClassName ⇒ CustomRouter(routerClassName) + + val targets = deployment.getStringList("target.paths").asScala.toSeq + + val nrOfInstances = deployment.getInt("nr-of-instances") + + val router: RouterConfig = deployment.getString("router") match { + case "direct" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, targets) + case "random" ⇒ RandomRouter(nrOfInstances, targets) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, targets) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, targets) + case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) } - // -------------------------------- - // akka.actor.deployment..nr-of-instances - // -------------------------------- - val nrOfInstances = { - if (router == NoRouting) OneNrOfInstances - else { - def invalidNrOfInstances(wasValue: Any) = new ConfigurationException( - "Config option [akka.actor.deployment." + key + - ".nr-of-instances] needs to be either [\"auto\"] or [1-N] - was [" + - wasValue + "]") - - deployment.getAnyRef("nr-of-instances").asInstanceOf[Any] match { - case "auto" ⇒ AutoNrOfInstances - case 1 ⇒ OneNrOfInstances - case 0 ⇒ ZeroNrOfInstances - case nrOfReplicas: Number ⇒ - try { - new NrOfInstances(nrOfReplicas.intValue) - } catch { - case e: Exception ⇒ throw invalidNrOfInstances(nrOfReplicas) - } - case unknown ⇒ throw invalidNrOfInstances(unknown) - } - } - } - - // -------------------------------- - // akka.actor.deployment..create-as - // -------------------------------- val recipe: Option[ActorRecipe] = deployment.getString("create-as.class") match { case "" ⇒ None @@ -91,7 +71,7 @@ class Deployer(val settings: ActorSystem.Settings) { Some(ActorRecipe(implementationClass)) } - Some(Deploy(key, recipe, router, nrOfInstances, LocalScope)) + Some(Deploy(key, deployment, recipe, router, LocalScope)) } } diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala deleted file mode 100644 index 6b30ee351a..0000000000 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.actor - -import akka.util.Duration -import akka.routing.RouterType - -object DeploymentConfig { - - // -------------------------------- - // --- Deploy - // -------------------------------- - case class Deploy( - path: String, - recipe: Option[ActorRecipe], - routing: Routing = NoRouting, - nrOfInstances: NrOfInstances = ZeroNrOfInstances, - scope: Scope = LocalScope) - - // -------------------------------- - // --- Actor Recipe - // -------------------------------- - case class ActorRecipe(implementationClass: Class[_ <: Actor]) //TODO Add ActorConfiguration here - - // -------------------------------- - // --- Routing - // -------------------------------- - sealed trait Routing - case class CustomRouter(routerClassName: String) extends Routing - - // For Java API - case class NoRouting() extends Routing - case class RoundRobin() extends Routing - case class Random() extends Routing - case class ScatterGather() extends Routing - case class LeastCPU() extends Routing - case class LeastRAM() extends Routing - case class LeastMessages() extends Routing - - // For Scala API - case object NoRouting extends Routing - case object RoundRobin extends Routing - case object Random extends Routing - case object ScatterGather extends Routing - case object LeastCPU extends Routing - case object LeastRAM extends Routing - case object LeastMessages extends Routing - - // -------------------------------- - // --- Scope - // -------------------------------- - trait Scope - - // For Java API - case class LocalScope() extends Scope - - // For Scala API - case object LocalScope extends Scope - - // -------------------------------- - // --- Home - // -------------------------------- - sealed trait Home - // case class Host(hostName: String) extends Home - case class Node(nodeName: String) extends Home - // case class IP(ipAddress: String) extends Home - - // -------------------------------- - // --- Replicas - // -------------------------------- - - class NrOfInstances(val factor: Int) extends Serializable { - // note that -1 is used for AutoNrOfInstances - if (factor < -1) throw new IllegalArgumentException("nr-of-instances can not be negative") - override def hashCode = 0 + factor.## - override def equals(other: Any) = NrOfInstances.unapply(this) == NrOfInstances.unapply(other) - override def toString = if (factor == -1) "NrOfInstances(auto)" else "NrOfInstances(" + factor + ")" - } - - object NrOfInstances { - def apply(factor: Int): NrOfInstances = factor match { - case -1 ⇒ AutoNrOfInstances - case 0 ⇒ ZeroNrOfInstances - case 1 ⇒ OneNrOfInstances - case x ⇒ new NrOfInstances(x) - } - def unapply(other: Any) = other match { - case x: NrOfInstances ⇒ import x._; Some(factor) - case _ ⇒ None - } - } - - // For Java API - class AutoNrOfInstances extends NrOfInstances(-1) - class ZeroNrOfInstances extends NrOfInstances(0) - class OneNrOfInstances extends NrOfInstances(1) - - // For Scala API - case object AutoNrOfInstances extends AutoNrOfInstances - case object ZeroNrOfInstances extends ZeroNrOfInstances - case object OneNrOfInstances extends OneNrOfInstances - - // -------------------------------- - // --- Replication - // -------------------------------- - sealed trait ReplicationScheme - - // For Java API - case class Transient() extends ReplicationScheme - - // For Scala API - case object Transient extends ReplicationScheme - case class Replication( - storage: ReplicationStorage, - strategy: ReplicationStrategy) extends ReplicationScheme - - // -------------------------------- - // --- ReplicationStorage - // -------------------------------- - sealed trait ReplicationStorage - - // For Java API - case class TransactionLog() extends ReplicationStorage - case class DataGrid() extends ReplicationStorage - - // For Scala API - case object TransactionLog extends ReplicationStorage - case object DataGrid extends ReplicationStorage - - // -------------------------------- - // --- ReplicationStrategy - // -------------------------------- - sealed trait ReplicationStrategy - - // For Java API - sealed class WriteBehind extends ReplicationStrategy - sealed class WriteThrough extends ReplicationStrategy - - // For Scala API - case object WriteBehind extends WriteBehind - case object WriteThrough extends WriteThrough - - // -------------------------------- - // --- Helper methods for parsing - // -------------------------------- - - def nodeNameFor(home: Home): String = home match { - case Node(nodename) ⇒ nodename - // case Host("localhost") ⇒ Config.nodename - // case IP("0.0.0.0") ⇒ Config.nodename - // case IP("127.0.0.1") ⇒ Config.nodename - // case Host(hostname) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]") - // case IP(address) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]") - } - - def routerTypeFor(routing: Routing): RouterType = routing match { - case _: NoRouting | NoRouting ⇒ RouterType.NoRouter - case _: RoundRobin | RoundRobin ⇒ RouterType.RoundRobin - case _: Random | Random ⇒ RouterType.Random - case _: ScatterGather | ScatterGather ⇒ RouterType.ScatterGather - case _: LeastCPU | LeastCPU ⇒ RouterType.LeastCPU - case _: LeastRAM | LeastRAM ⇒ RouterType.LeastRAM - case _: LeastMessages | LeastMessages ⇒ RouterType.LeastMessages - case CustomRouter(implClass) ⇒ RouterType.Custom(implClass) - } - - def isReplicated(replicationScheme: ReplicationScheme): Boolean = - isReplicatedWithTransactionLog(replicationScheme) || - isReplicatedWithDataGrid(replicationScheme) - - def isWriteBehindReplication(replicationScheme: ReplicationScheme): Boolean = replicationScheme match { - case _: Transient | Transient ⇒ false - case Replication(_, strategy) ⇒ - strategy match { - case _: WriteBehind | WriteBehind ⇒ true - case _: WriteThrough | WriteThrough ⇒ false - } - } - - def isWriteThroughReplication(replicationScheme: ReplicationScheme): Boolean = replicationScheme match { - case _: Transient | Transient ⇒ false - case Replication(_, strategy) ⇒ - strategy match { - case _: WriteBehind | WriteBehind ⇒ true - case _: WriteThrough | WriteThrough ⇒ false - } - } - - def isReplicatedWithTransactionLog(replicationScheme: ReplicationScheme): Boolean = replicationScheme match { - case _: Transient | Transient ⇒ false - case Replication(storage, _) ⇒ - storage match { - case _: TransactionLog | TransactionLog ⇒ true - case _: DataGrid | DataGrid ⇒ throw new UnsupportedOperationException("ReplicationStorage 'DataGrid' is no supported yet") - } - } - - def isReplicatedWithDataGrid(replicationScheme: ReplicationScheme): Boolean = replicationScheme match { - case _: Transient | Transient ⇒ false - case Replication(storage, _) ⇒ - storage match { - case _: TransactionLog | TransactionLog ⇒ false - case _: DataGrid | DataGrid ⇒ throw new UnsupportedOperationException("ReplicationStorage 'DataGrid' is no supported yet") - } - } - -} - -/** - * Module holding the programmatic deployment configuration classes. - * Defines the deployment specification. - * Most values have defaults and can be left out. - * - * @author Jonas Bonér - */ -class DeploymentConfig(val nodename: String) { - - import DeploymentConfig._ - - case class ClusterScope(preferredNodes: Iterable[Home] = Vector(Node(nodename)), replication: ReplicationScheme = Transient) extends Scope - - def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == nodename) - - def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match { - case Deploy(_, _, _, _, ClusterScope(_, replicationScheme)) ⇒ Some(replicationScheme) - case _ ⇒ None - } - - def isReplicated(deployment: Deploy): Boolean = replicationSchemeFor(deployment) match { - case Some(replicationScheme) ⇒ DeploymentConfig.isReplicated(replicationScheme) - case _ ⇒ false - } - -} diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala deleted file mode 100644 index 98a9f9f188..0000000000 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ /dev/null @@ -1,580 +0,0 @@ -/** - * Copyright (C) 2009-2010 Typesafe Inc. - */ - -package akka.cluster - -import akka.actor._ -import DeploymentConfig._ -import akka.dispatch.Future -import akka.routing._ -import akka.serialization.Serializer -import akka.cluster.metrics._ -import akka.util.Duration -import akka.util.duration._ -import akka.AkkaException - -import com.eaio.uuid.UUID - -import java.net.InetSocketAddress -import java.util.concurrent.{ ConcurrentSkipListSet } - -class ClusterException(message: String) extends AkkaException(message) - -object ChangeListener { - - /** - * Cluster membership change listener. - * For Scala API. - */ - trait ChangeListener { - def notify(event: ChangeNotification, client: ClusterNode) { - event match { - case NodeConnected(name) ⇒ nodeConnected(name, client) - case NodeDisconnected(name) ⇒ nodeDisconnected(name, client) - case NewLeader(name: String) ⇒ newLeader(name, client) - case NewSession ⇒ thisNodeNewSession(client) - case ThisNode.Connected ⇒ thisNodeConnected(client) - case ThisNode.Disconnected ⇒ thisNodeDisconnected(client) - case ThisNode.Expired ⇒ thisNodeExpired(client) - } - } - - def nodeConnected(node: String, client: ClusterNode) {} - - def nodeDisconnected(node: String, client: ClusterNode) {} - - def newLeader(name: String, client: ClusterNode) {} - - def thisNodeNewSession(client: ClusterNode) {} - - def thisNodeConnected(client: ClusterNode) {} - - def thisNodeDisconnected(client: ClusterNode) {} - - def thisNodeExpired(client: ClusterNode) {} - } - - /** - * Cluster membership change listener. - * For Java API. - */ - abstract class ChangeListenerAdapter extends ChangeListener - - sealed trait ChangeNotification - - case class NodeConnected(node: String) extends ChangeNotification - - case class NodeDisconnected(node: String) extends ChangeNotification - - case class NewLeader(name: String) extends ChangeNotification - - case object NewSession extends ChangeNotification - - object ThisNode { - - case object Connected extends ChangeNotification - - case object Disconnected extends ChangeNotification - - case object Expired extends ChangeNotification - - } -} - -/** - * Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance. - * - * @author Jonas Bonér - */ -class NodeAddress(val clusterName: String, val nodeName: String) { - if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string") - if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string") - - override def toString = "%s:%s".format(clusterName, nodeName) - - override def hashCode = 0 + clusterName.## + nodeName.## - - override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other) -} - -/** - * NodeAddress companion object and factory. - */ -object NodeAddress { - def apply(clusterName: String, nodeName: String): NodeAddress = new NodeAddress(clusterName, nodeName) - def apply(system: ActorSystem): NodeAddress = new NodeAddress(system.clustername, system.nodename) - - def unapply(other: Any) = other match { - case address: NodeAddress ⇒ Some((address.clusterName, address.nodeName)) - case _ ⇒ None - } -} - -/* - * Allows user to access metrics of a different nodes in the cluster. Changing metrics can be monitored - * using {@link MetricsAlterationMonitor} - * Metrics of the cluster nodes are distributed through ZooKeeper. For better performance, metrics are - * cached internally, and refreshed from ZooKeeper after an interval - */ -trait NodeMetricsManager { - - /* - * Gets metrics of a local node directly from JMX monitoring beans/Hyperic Sigar - */ - def getLocalMetrics: NodeMetrics - - /* - * Gets metrics of a specified node - * @param nodeName metrics of the node specified by the name will be returned - * @param useCached if true, returns metrics cached in the metrics manager, - * gets metrics directly from ZooKeeper otherwise - */ - def getMetrics(nodeName: String, useCached: Boolean = true): Option[NodeMetrics] - - /* - * Gets cached metrics of all nodes in the cluster - */ - def getAllMetrics: Array[NodeMetrics] - - /* - * Adds monitor that reacts, when specific conditions are satisfied - */ - def addMonitor(monitor: MetricsAlterationMonitor): Unit - - /* - * Removes monitor - */ - def removeMonitor(monitor: MetricsAlterationMonitor): Unit - - /* - * Removes metrics of s specified node from ZooKeeper and metrics manager cache - */ - def removeNodeMetrics(nodeName: String): Unit - - /* - * Sets timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper - */ - def refreshTimeout_=(newValue: Duration): Unit - - /* - * Timeout after which metrics, cached in the metrics manager, will be refreshed from ZooKeeper - */ - def refreshTimeout: Duration - - /* - * Starts metrics manager. When metrics manager is started, it refreshes cache from ZooKeeper - * after refreshTimeout, and invokes plugged monitors - */ - def start(): NodeMetricsManager - - /* - * Stops metrics manager. Stopped metrics manager doesn't refresh cache from ZooKeeper, - * and doesn't invoke plugged monitors - */ - def stop(): Unit - - /* - * If the value is true, metrics manages is started and running. Stopped, otherwise - */ - def isRunning: Boolean - -} - -/** - * Interface for cluster node. - * - * @author Jonas Bonér - */ -trait ClusterNode { - import ChangeListener._ - - private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]() - - def membershipNodes: Array[String] - - def nodeAddress: NodeAddress - - def zkServerAddresses: String - - def start() - - def shutdown() - - def isShutdown: Boolean - - def disconnect(): ClusterNode - - def reconnect(): ClusterNode - - def metricsManager: NodeMetricsManager - - /** - * Registers a cluster change listener. - */ - def register(listener: ChangeListener): ClusterNode - - /** - * Returns the name of the current leader. - */ - def leader: String - - /** - * Returns true if 'this' node is the current leader. - */ - def isLeader: Boolean - - /** - * Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op. - */ - def resign() - - /** - * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store[T <: Actor](actorAddress: String, actorClass: Class[T], serializer: Serializer): ClusterNode - - /** - * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode - - /** - * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, serializer: Serializer): ClusterNode - - /** - * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode - - /** - * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store[T <: Actor](actorAddress: String, actorClass: Class[T], serializeMailbox: Boolean, serializer: Serializer): ClusterNode - - /** - * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode - - /** - * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode - - /** - * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store[T <: Actor](address: String, actorClass: Class[T], nrOfInstances: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode - - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, serializer: Serializer): ClusterNode - - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode - - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode - - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, nrOfInstances: Int, serializer: Serializer): ClusterNode - - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, nrOfInstances: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode - - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - - /** - * Needed to have reflection through structural typing work. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, nrOfInstances: Int, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode - - /** - * Needed to have reflection through structural typing work. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, nrOfInstances: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode - - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, nrOfInstances: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode - - /** - * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated - * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly - * available durable store. - */ - def store(actorAddress: String, actorFactory: () ⇒ ActorRef, nrOfInstances: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode - - /** - * Removes actor from the cluster. - */ - // def remove(actorRef: ActorRef) - - /** - * Removes actor with address from the cluster. - */ - // def remove(address: String): ClusterNode - - /** - * Is the actor with uuid clustered or not? - */ - def isClustered(actorAddress: String): Boolean - - /** - * Is the actor with uuid in use on 'this' node or not? - */ - def isInUseOnNode(actorAddress: String): Boolean - - /** - * Is the actor with uuid in use or not? - */ - def isInUseOnNode(actorAddress: String, nodeName: String): Boolean - - /** - * Is the actor with uuid in use or not? - */ - def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean - - /** - * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available - * for remote access through lookup by its UUID. - */ - def use[T <: Actor](actorAddress: String): Option[LocalActorRef] - - /** - * Using (checking out) actor on a specific set of nodes. - */ - def useActorOnNodes(nodes: Array[String], actorAddress: String, replicateFromUuid: Option[UUID]) - - /** - * Using (checking out) actor on all nodes in the cluster. - */ - def useActorOnAllNodes(actorAddress: String, replicateFromUuid: Option[UUID]) - - /** - * Using (checking out) actor on a specific node. - */ - def useActorOnNode(node: String, actorAddress: String, replicateFromUuid: Option[UUID]) - - /** - * Checks in an actor after done using it on this node. - */ - def release(actorRef: ActorRef) - - /** - * Checks in an actor after done using it on this node. - */ - def release(actorAddress: String) - - /** - * Creates an ActorRef with a Router to a set of clustered actors. - */ - def ref(actorAddress: String, router: RouterType): ActorRef - - /** - * Returns the addresses of all actors checked out on this node. - */ - def addressesForActorsInUse: Array[String] - - /** - * Returns the addresses of all actors registered in this cluster. - */ - def addressesForClusteredActors: Array[String] - - /** - * Returns the addresses of all actors in use registered on a specific node. - */ - def addressesForActorsInUseOnNode(nodeName: String): Array[String] - - /** - * Returns Serializer for actor with UUID. - */ - def serializerForActor(actorAddress: String): Serializer - - /** - * Returns home address for actor with UUID. - */ - def inetSocketAddressesForActor(actorAddress: String): Array[(UUID, InetSocketAddress)] - - /** - * Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument). - */ - def send(f: Function0[Unit], nrOfInstances: Int) - - /** - * Send a function 'Function0[Any]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument). - * Returns an 'Array' with all the 'Future's from the computation. - */ - def send(f: Function0[Any], nrOfInstances: Int): List[Future[Any]] - - /** - * Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument) - * with the argument speficied. - */ - def send(f: Function1[Any, Unit], arg: Any, nrOfInstances: Int) - - /** - * Send a function 'Function1[Any, Any]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument) - * with the argument speficied. - * Returns an 'Array' with all the 'Future's from the computation. - */ - def send(f: Function1[Any, Any], arg: Any, nrOfInstances: Int): List[Future[Any]] - - /** - * Stores a configuration element under a specific key. - * If the key already exists then it will be overwritten. - */ - def setConfigElement(key: String, bytes: Array[Byte]) - - /** - * Returns the config element for the key or NULL if no element exists under the key. - * Returns Some(element) if it exists else None - */ - def getConfigElement(key: String): Option[Array[Byte]] - - /** - * Removes configuration element for a specific key. - * Does nothing if the key does not exist. - */ - def removeConfigElement(key: String) - - /** - * Returns a list with all config element keys. - */ - def getConfigElementKeys: Array[String] - - // =============== PRIVATE METHODS =============== - - // FIXME BAD BAD BAD - considering moving all these private[cluster] methods to a separate trait to get them out of the user's view - - private[cluster] def remoteClientLifeCycleHandler: ActorRef - - private[cluster] def remoteDaemon: ActorRef - - /** - * Removes actor with uuid from the cluster. - */ - // private[cluster] def remove(uuid: UUID) - - /** - * Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'. - */ - private[cluster] def releaseActorOnAllNodes(actorAddress: String) - - /** - * Returns the UUIDs of all actors checked out on this node. - */ - private[cluster] def uuidsForActorsInUse: Array[UUID] - - /** - * Returns the UUIDs of all actors registered in this cluster. - */ - private[cluster] def uuidsForClusteredActors: Array[UUID] - - /** - * Returns the actor id for the actor with a specific UUID. - */ - private[cluster] def actorAddressForUuid(uuid: UUID): Option[String] - - /** - * Returns the actor ids for all the actors with a specific UUID. - */ - private[cluster] def actorAddressForUuids(uuids: Array[UUID]): Array[String] - - /** - * Returns the actor UUIDs for actor ID. - */ - private[cluster] def uuidsForActorAddress(actorAddress: String): Array[UUID] - - /** - * Returns the UUIDs of all actors in use registered on a specific node. - */ - private[cluster] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] - - private[cluster] def boot() - - private[cluster] def publish(change: ChangeNotification) - - private[cluster] def joinCluster() - - private[cluster] def joinLeaderElection: Boolean - - private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress) - - private[cluster] def migrateActorsOnFailedNodes( - failedNodes: List[String], - currentClusterNodes: List[String], - oldClusterNodes: List[String], - disconnectedConnections: Map[String, InetSocketAddress]) - - private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster( - newlyConnectedMembershipNodes: Traversable[String], - newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] - - private[cluster] def remoteSocketAddressForNode(node: String): Option[InetSocketAddress] - - private[cluster] def membershipPathFor(node: String): String - private[cluster] def configurationPathFor(key: String): String - - private[cluster] def actorAddressToNodesPathFor(actorAddress: String): String - private[cluster] def actorAddressToNodesPathFor(actorAddress: String, nodeName: String): String - - private[cluster] def nodeToUuidsPathFor(node: String): String - private[cluster] def nodeToUuidsPathFor(node: String, uuid: UUID): String - - private[cluster] def actorAddressRegistryPathFor(actorAddress: String): String - private[cluster] def actorAddressRegistrySerializerPathFor(actorAddress: String): String - private[cluster] def actorAddressRegistryUuidPathFor(actorAddress: String): String - - private[cluster] def actorUuidRegistryPathFor(uuid: UUID): String - private[cluster] def actorUuidRegistryNodePathFor(uuid: UUID): String - private[cluster] def actorUuidRegistryAddressPathFor(uuid: UUID): String - - private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String -} - diff --git a/akka-actor/src/main/scala/akka/cluster/metrics/MetricsAlterationMonitor.scala b/akka-actor/src/main/scala/akka/cluster/metrics/MetricsAlterationMonitor.scala deleted file mode 100644 index 7f16601e3a..0000000000 --- a/akka-actor/src/main/scala/akka/cluster/metrics/MetricsAlterationMonitor.scala +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.cluster.metrics - -/* - * {@link NodeMetricsManager} periodically refershes internal cache with node metrics from MBeans / Sigar. - * Every time local cache is refreshed, monitors plugged to the metrics manager are invoked. - * If updated metrics satisfy conditions, specified in reactsOn, - * react is called - * - * @exampl {{{ - * class PeakCPULoadMonitor extends LocalMetricsAlterationMonitor { - * val id = "peak-cpu-load-monitor" - * - * def reactsOn(metrics: NodeMetrics) = - * metrics.systemLoadAverage > 0.8 - * - * def react(metrics: NodeMetrics) = - * println("Peak average system load at node [%s] is reached!" format (metrics.nodeName)) - * } - * }}} - * - */ -trait LocalMetricsAlterationMonitor extends MetricsAlterationMonitor { - - /* - * Definies conditions that must be satisfied in order to react on the changed metrics - */ - def reactsOn(metrics: NodeMetrics): Boolean - - /* - * Reacts on the changed metrics - */ - def react(metrics: NodeMetrics): Unit - -} - -/* - * {@link NodeMetricsManager} periodically refershes internal cache with metrics of all nodes in the cluster - * from ZooKeeper. Every time local cache is refreshed, monitors plugged to the metrics manager are invoked. - * If updated metrics satisfy conditions, specified in reactsOn, - * react is called - * - * @exampl {{{ - * class PeakCPULoadReached extends ClusterMetricsAlterationMonitor { - * val id = "peak-cpu-load-reached" - * - * def reactsOn(metrics: Array[NodeMetrics]) = - * metrics.forall(_.systemLoadAverage > 0.8) - * - * def react(metrics: Array[NodeMetrics]) = - * println("One of the nodes in the scluster has reached the peak system load!") - * } - * }}} - * - */ -trait ClusterMetricsAlterationMonitor extends MetricsAlterationMonitor { - - /* - * Definies conditions that must be satisfied in order to react on the changed metrics - */ - def reactsOn(allMetrics: Array[NodeMetrics]): Boolean - - /* - * Reacts on the changed metrics - */ - def react(allMetrics: Array[NodeMetrics]): Unit - -} - -sealed trait MetricsAlterationMonitor extends Comparable[MetricsAlterationMonitor] { - - /* - * Unique identiifier of the monitor - */ - def id: String - - def compareTo(otherMonitor: MetricsAlterationMonitor) = id.compareTo(otherMonitor.id) - -} diff --git a/akka-actor/src/main/scala/akka/cluster/metrics/NodeMetrics.scala b/akka-actor/src/main/scala/akka/cluster/metrics/NodeMetrics.scala deleted file mode 100644 index a292035c6a..0000000000 --- a/akka-actor/src/main/scala/akka/cluster/metrics/NodeMetrics.scala +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.cluster.metrics - -/* - * Snapshot of the JVM / system that's the node is running on - */ -trait NodeMetrics { - - /* - * Name of the node the metrics are gathered at - */ - def nodeName: String - - /* - * Amount of heap memory currently used - */ - def usedHeapMemory: Long - - /* - * Amount of heap memory guaranteed to be available - */ - def committedHeapMemory: Long - - /* - * Maximum amount of heap memory that can be used - */ - def maxHeapMemory: Long - - /* - * Number of the processors avalable to the JVM - */ - def avaiableProcessors: Int - - /* - * If OS-specific Hyperic Sigar library is plugged, it's used to calculate - * average load on the CPUs in the system. Otherwise, value is retreived from monitoring MBeans. - * Hyperic Sigar provides more precise values, and, thus, if the library is provided, it's used by default. - */ - def systemLoadAverage: Double - -} diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 034993bd22..75c17a361d 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1,7 +1,6 @@ /** * Copyright (C) 2009-2011 Typesafe Inc. */ - package akka.routing import akka.actor._ @@ -9,88 +8,12 @@ import akka.actor._ import akka.japi.Creator import java.lang.reflect.InvocationTargetException import akka.config.ConfigurationException -import akka.actor.DeploymentConfig.Deploy import java.util.concurrent.atomic.AtomicInteger import akka.util.ReflectiveAccess import akka.AkkaException import scala.collection.JavaConversions._ -import akka.routing.Routing.{ Destination, Broadcast } import java.util.concurrent.TimeUnit -sealed trait RouterType - -/** - * Used for declarative configuration of Routing. - * - * @author Jonas Bonér - */ -object RouterType { - - /** - * A RouterType that indicates no routing - i.e. direct message. - */ - object NoRouter extends RouterType - - /** - * A RouterType that randomly selects a connection to send a message to. - */ - object Random extends RouterType - - /** - * A RouterType that selects the connection by using round robin. - */ - object RoundRobin extends RouterType - - /** - * A RouterType that broadcasts the messages to all connections. - */ - object Broadcast extends RouterType - - /** - * A RouterType that selects the connection by using scatter gather. - */ - object ScatterGather extends RouterType - - /** - * A RouterType that selects the connection based on the least amount of cpu usage - */ - object LeastCPU extends RouterType - - /** - * A RouterType that select the connection based on the least amount of ram used. - */ - object LeastRAM extends RouterType - - /** - * A RouterType that select the connection where the actor has the least amount of messages in its mailbox. - */ - object LeastMessages extends RouterType - - /** - * A user-defined custom RouterType. - */ - case class Custom(implClass: String) extends RouterType -} - -/** - * An {@link AkkaException} thrown when something goes wrong while routing a message - */ -class RoutingException(message: String) extends AkkaException(message) - -/** - * Contains the configuration to create local and clustered routed actor references. - * Routed ActorRef configuration object, this is thread safe and fully sharable. - */ -case class RoutedProps private[akka] ( - routerFactory: () ⇒ Router, - connectionManager: ConnectionManager) { - - // Java API - def this(creator: Creator[Router], connectionManager: ConnectionManager) { - this(() ⇒ creator.create(), connectionManager) - } -} - /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to * send a message to on (or more) of these actors. @@ -98,13 +21,13 @@ case class RoutedProps private[akka] ( private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath) extends LocalActorRef( _system, - _props.copy(creator = _props.routerConfig), + _props.copy(creator = () ⇒ _props.routerConfig.createActor()), _supervisor, _path) { - val route: Routing.Route = _props.routerConfig.createRoute(_props.creator, actorContext) + val route: Route = _props.routerConfig.createRoute(_props.creator, actorContext) - override def !(message: Any)(implicit sender: ActorRef = null) { + override def !(message: Any)(implicit sender: ActorRef = null): Unit = { val s = if (sender eq null) underlying.system.deadLetters else sender val msg = message match { @@ -119,68 +42,76 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup } } -trait RouterConfig extends Function0[Actor] { - def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig - - def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Routing.Route -} - /** - * A Router is responsible for sending a message to one (or more) of its connections. + * This trait represents a router factory: it produces the actual router actor + * and creates the routing table (a function which determines the recipients + * for each message which is to be dispatched). The resulting RoutedActorRef + * optimizes the sending of the message so that it does NOT go through the + * router’s mailbox unless the route returns an empty recipient set. * - * @author Jonas Bonér + * '''Caution:''' This means + * that the route function is evaluated concurrently without protection by + * the RoutedActorRef: either provide a reentrant (i.e. pure) implementation or + * do the locking yourself! + * + * '''Caution:''' Please note that the [[akka.routing.Router]] which needs to + * be returned by `apply()` should not send a message to itself in its + * constructor or `preStart()` or publish its self reference from there: if + * someone tries sending a message to that reference before the constructor of + * RoutedActorRef has returned, there will be a `NullPointerException`! */ -trait Router { - def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[ActorRef]): Vector[ActorRef] = (nrOfInstances, targets) match { - case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") - case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) - case (_, xs) ⇒ Vector.empty[ActorRef] ++ xs - } -} +trait RouterConfig { -/** - * A Helper class to create actor references that use routing. - */ -object Routing { + def createActor(): Router = new Router {} - sealed trait RoutingMessage - - /** - * Used to broadcast a message to all connections in a router. E.g. every connection gets the message - * regardless of their routing algorithm. - */ - case class Broadcast(message: Any) extends RoutingMessage - - def createCustomRouter(implClass: String): Router = { - ReflectiveAccess.createInstance(implClass, Array[Class[_]](), Array[AnyRef]()) match { - case Right(router) ⇒ router.asInstanceOf[Router] - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - - throw new ConfigurationException("Could not instantiate custom Router of [" + - implClass + "] due to: " + cause, cause) + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { + deploy match { + case Some(Deploy(_, _, _, NoRouter, _)) ⇒ this + case Some(Deploy(_, _, _, r, _)) ⇒ r + case _ ⇒ this } } - case class Destination(sender: ActorRef, recipient: ActorRef) - type Route = (ActorRef, Any) ⇒ Iterable[Destination] + def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Route + + protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[String]): Vector[ActorRef] = (nrOfInstances, targets) match { + case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) + case (_, xs) ⇒ Vector.empty[ActorRef] ++ xs.map(context.actorFor(_)) + } } +/** + * Base trait for `Router` actors. Override `receive` to handle custom + * messages which the corresponding [[akka.actor.RouterConfig]] lets + * through by returning an empty route. + */ +trait Router extends Actor { + def receive = { + case _ ⇒ + } +} + +/** + * Used to broadcast a message to all connections in a router; only the + * contained message will be forwarded, i.e. the `Broadcast(...)` + * envelope will be stripped off. + * + * Router implementations may choose to handle this message differently. + */ +case class Broadcast(message: Any) + /** * Routing configuration that indicates no routing. * Oxymoron style. */ case object NoRouter extends RouterConfig { - def adaptFromDeploy(deploy: Option[Deploy]) = null - def createRoute(creator: () ⇒ Actor, actorContext: ActorContext) = null - - def apply(): Actor = null } +object RoundRobinRouter { + def apply(targets: Iterable[ActorRef]) = new RoundRobinRouter(targets = targets map (_.path.toString)) +} /** * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. *
@@ -192,8 +123,7 @@ case object NoRouter extends RouterConfig { * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) - extends Router with RouterConfig { +case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig { /** * Constructor that sets nrOfInstances to be created. @@ -207,26 +137,11 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] * Constructor that sets the targets to be used. * Java API */ - def this(t: java.util.Collection[ActorRef]) = { + def this(t: java.util.Collection[String]) = { this(targets = collectionAsScalaIterable(t)) } - def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { - deploy match { - case Some(d) ⇒ - // In case there is a config then use this over any programmed settings. - copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) - case _ ⇒ this - } - } - - def apply(): Actor = new Actor { - def receive = { - case _ ⇒ - } - } - - def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { val routees: Vector[ActorRef] = createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) @@ -246,6 +161,9 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] } } +object RandomRouter { + def apply(targets: Iterable[ActorRef]) = new RandomRouter(targets = targets map (_.path.toString)) +} /** * A Router that randomly selects one of the target connections to send a message to. *
@@ -257,8 +175,7 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) - extends Router with RouterConfig { +case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig { /** * Constructor that sets nrOfInstances to be created. @@ -272,32 +189,17 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni * Constructor that sets the targets to be used. * Java API */ - def this(t: java.util.Collection[ActorRef]) = { + def this(t: java.util.Collection[String]) = { this(targets = collectionAsScalaIterable(t)) } - def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { - deploy match { - case Some(d) ⇒ - // In case there is a config then use this over any programmed settings. - copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) - case _ ⇒ this - } - } - - def apply(): Actor = new Actor { - def receive = { - case _ ⇒ - } - } - import java.security.SecureRandom private val random = new ThreadLocal[SecureRandom] { override def initialValue = SecureRandom.getInstance("SHA1PRNG") } - def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { val routees: Vector[ActorRef] = createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) @@ -315,6 +217,9 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni } } +object BroadcastRouter { + def apply(targets: Iterable[ActorRef]) = new BroadcastRouter(targets = targets map (_.path.toString)) +} /** * A Router that uses broadcasts a message to all its connections. *
@@ -326,8 +231,7 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) - extends Router with RouterConfig { +case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig { /** * Constructor that sets nrOfInstances to be created. @@ -341,26 +245,11 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = * Constructor that sets the targets to be used. * Java API */ - def this(t: java.util.Collection[ActorRef]) = { + def this(t: java.util.Collection[String]) = { this(targets = collectionAsScalaIterable(t)) } - def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { - deploy match { - case Some(d) ⇒ - // In case there is a config then use this over any programmed settings. - copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) - case _ ⇒ this - } - } - - def apply(): Actor = new Actor { - def receive = { - case _ ⇒ - } - } - - def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { val routees: Vector[ActorRef] = createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) @@ -374,6 +263,9 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = } } +object ScatterGatherFirstCompletedRouter { + def apply(targets: Iterable[ActorRef]) = new ScatterGatherFirstCompletedRouter(targets = targets map (_.path.toString)) +} /** * Simple router that broadcasts the message to all routees, and replies with the first response. *
@@ -385,7 +277,7 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig { +case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig { /** * Constructor that sets nrOfInstances to be created. @@ -399,31 +291,13 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: It * Constructor that sets the targets to be used. * Java API */ - def this(t: java.util.Collection[ActorRef]) = { + def this(t: java.util.Collection[String]) = { this(targets = collectionAsScalaIterable(t)) } - def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { - deploy match { - case Some(d) ⇒ - // In case there is a config then use this over any programmed settings. - copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) - case _ ⇒ this - } - } - - def apply(): Actor = new Actor { - def receive = { - case _ ⇒ - } - } - - def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { - val routees: Vector[ActorRef] = (nrOfInstances, targets) match { - case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") - case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) - case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs - } + def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { + val routees: Vector[ActorRef] = + createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) { (sender, message) ⇒ val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME! diff --git a/akka-actor/src/main/scala/akka/routing/package.scala b/akka-actor/src/main/scala/akka/routing/package.scala new file mode 100644 index 0000000000..f81d47e6e1 --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/package.scala @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka + +import akka.actor.ActorRef + +package object routing { + + case class Destination(sender: ActorRef, recipient: ActorRef) + + type Route = (ActorRef, Any) ⇒ Iterable[Destination] + +} \ No newline at end of file diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 41eaf2e117..4083a64ea2 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -7,6 +7,29 @@ akka { + actor { + + deployment { + + default { + + remote = "" # if this is set to a valid remote address, the named actor will be deployed at that node + # e.g. "akka://sys@host:port" + + target { + nodes = [] # A list of hostnames and ports for instantiating the children of a non-direct router + # The format should be on "akka://sys@host:port", where: + # - sys is the remote actor system name + # - hostname can be either hostname or IP address the remote actor should connect to + # - port should be the port for the remote server on the other node + # The number of actor instances to be spawned is still taken from the nr-of-instances + # setting as for local routers; the instances will be distributed round-robin among the + # given nodes. + } + } + } + } + remote { transport = "akka.remote.netty.NettyRemoteSupport" diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index b67b7805be..a3f45a6774 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -10,7 +10,6 @@ import akka.actor.Status._ import akka.util._ import akka.util.duration._ import akka.util.Helpers._ -import akka.actor.DeploymentConfig._ import akka.serialization.Compression.LZF import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 391198efb3..d316a50cea 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -87,7 +87,7 @@ class RemoteActorRefProvider( */ @scala.annotation.tailrec - def lookupRemotes(p: Iterable[String]): Option[DeploymentConfig.Deploy] = { + def lookupRemotes(p: Iterable[String]): Option[Deploy] = { p.headOption match { case None ⇒ None case Some("remote") ⇒ lookupRemotes(p.drop(2)) @@ -104,7 +104,7 @@ class RemoteActorRefProvider( }) deployment match { - case Some(DeploymentConfig.Deploy(_, _, _, _, RemoteDeploymentConfig.RemoteScope(address))) ⇒ + case Some(Deploy(_, _, _, _, RemoteScope(address))) ⇒ if (address == rootPath.address) local.actorOf(system, props, supervisor, path, false) else address.parse(remote.transports) match { case Left(x) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index 69e2e553ba..cd2abce344 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -4,21 +4,14 @@ package akka.remote import akka.actor._ -import akka.actor.DeploymentConfig._ import akka.event.EventStream import com.typesafe.config._ import akka.config.ConfigurationException -object RemoteDeploymentConfig { - - case class RemoteScope(node: UnparsedSystemAddress[UnparsedTransportAddress]) extends DeploymentConfig.Scope - -} +case class RemoteScope(node: UnparsedSystemAddress[UnparsedTransportAddress]) extends Scope class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings) { - import RemoteDeploymentConfig._ - override protected def parseConfig(path: String, config: Config): Option[Deploy] = { import scala.collection.JavaConverters._ import akka.util.ReflectiveAccess._ diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala index 5e554d4597..08a1956b4b 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -3,7 +3,6 @@ package akka.remote.random_routed import akka.actor.Actor import akka.remote._ import akka.routing._ -import akka.routing.Routing.Broadcast import akka.testkit.DefaultTimeout object RandomRoutedRemoteActorMultiJvmSpec { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index d0f0f4e03f..b992edecdc 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -3,7 +3,6 @@ package akka.remote.round_robin_routed import akka.actor.Actor import akka.remote._ import akka.routing._ -import akka.routing.Routing.Broadcast import akka.testkit.DefaultTimeout object RoundRobinRoutedRemoteActorMultiJvmSpec { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index 8e0a6c1e1e..f4ab00e945 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -3,7 +3,6 @@ package akka.remote.scatter_gather_routed import akka.actor.Actor import akka.remote._ import akka.routing._ -import akka.routing.Routing.Broadcast import akka.testkit.DefaultTimeout object ScatterGatherRoutedRemoteActorMultiJvmSpec { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index e94e0b8fd3..bb219f3d55 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -5,9 +5,8 @@ package akka.remote import akka.testkit._ import akka.actor._ +import akka.routing._ import com.typesafe.config._ -import akka.actor.DeploymentConfig._ -import akka.remote.RemoteDeploymentConfig.RemoteScope object RemoteDeployerSpec { val deployerConf = ConfigFactory.parseString(""" @@ -41,9 +40,9 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { deployment must be(Some( Deploy( service, + deployment.get.config, None, - RoundRobin, - NrOfInstances(3), + RoundRobinRouter(3), RemoteScope(UnparsedSystemAddress(Some("sys"), UnparsedTransportAddress("akka", "wallace", 2552)))))) }