diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index e4a07d02c8..41ab687ad7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -22,7 +22,7 @@ class DeployerSpec extends WordSpec with MustMatchers { LeastCPU, NrOfInstances(3), BannagePeriodFailureDetector(10), - RemoteScope("localhost", 2552)))) + RemoteScope(List(RemoteAddress("localhost", 2552)))))) // ClusterScope( // List(Node("node1")), // new NrOfInstances(3), diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 02dfa121e2..00a3366e7f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -14,103 +14,6 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit } class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { - // "direct router" must { - - // "be able to shut down its instance" in { - // val address = "direct-0" - - // Deployer.deploy( - // Deploy( - // address, - // None, - // Direct, - // NrOfInstances(1), - // RemoveConnectionOnFirstFailureLocalFailureDetector, - // LocalScope)) - - // val helloLatch = new CountDownLatch(1) - // val stopLatch = new CountDownLatch(1) - - // val actor = actorOf(new Actor { - // def receive = { - // case "hello" ⇒ helloLatch.countDown() - // } - - // override def postStop() { - // stopLatch.countDown() - // } - // }, address) - - // actor ! "hello" - - // helloLatch.await(5, TimeUnit.SECONDS) must be(true) - - // actor.stop() - - // stopLatch.await(5, TimeUnit.SECONDS) must be(true) - // } - - // "send message to connection" in { - // val address = "direct-1" - - // Deployer.deploy( - // Deploy( - // address, - // None, - // Direct, - // NrOfInstances(1), - // RemoveConnectionOnFirstFailureLocalFailureDetector, - // LocalScope)) - - // val doneLatch = new CountDownLatch(1) - - // val counter = new AtomicInteger(0) - // val actor = actorOf(new Actor { - // def receive = { - // case "end" ⇒ doneLatch.countDown() - // case _ ⇒ counter.incrementAndGet() - // } - // }, address) - - // actor ! "hello" - // actor ! "end" - - // doneLatch.await(5, TimeUnit.SECONDS) must be(true) - - // counter.get must be(1) - // } - - // "deliver a broadcast message" in { - // val address = "direct-2" - - // Deployer.deploy( - // Deploy( - // address, - // None, - // Direct, - // NrOfInstances(1), - // RemoveConnectionOnFirstFailureLocalFailureDetector, - // LocalScope)) - - // val doneLatch = new CountDownLatch(1) - - // val counter1 = new AtomicInteger - // val actor = actorOf(new Actor { - // def receive = { - // case "end" ⇒ doneLatch.countDown() - // case msg: Int ⇒ counter1.addAndGet(msg) - // } - // }, address) - - // actor ! Broadcast(1) - // actor ! "end" - - // doneLatch.await(5, TimeUnit.SECONDS) must be(true) - - // counter1.get must be(1) - // } - // } - "round robin router" must { "be able to shut down its instance" in { diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 393b4bc7e7..8e9d460e1c 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -231,10 +231,29 @@ object Deployer extends ActorDeployer { if (addressConfig.getSection("cluster").isDefined) throw new ConfigurationException( "Configuration for deployment ID [" + address + "] can not have both 'remote' and 'cluster' sections.") - val hostname = remoteConfig.getString("hostname", "localhost") - val port = remoteConfig.getInt("port", 2552) + // -------------------------------- + // akka.actor.deployment.
.remote.nodes + // -------------------------------- + val remoteAddresses = remoteConfig.getList("nodes") match { + case Nil ⇒ Nil + case nodes ⇒ + def raiseRemoteNodeParsingError() = throw new ConfigurationException( + "Config option [" + addressPath + + ".remote.nodes] needs to be a list with elements on format \":\", was [" + nodes.mkString(", ") + "]") - Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(hostname, port))) + nodes map { node ⇒ + val tokenizer = new java.util.StringTokenizer(node, ":") + val hostname = tokenizer.nextElement.toString + if ((hostname eq null) || (hostname == "")) raiseRemoteNodeParsingError() + val port = try tokenizer.nextElement.toString.toInt catch { + case e: Exception ⇒ raiseRemoteNodeParsingError() + } + if (port == 0) raiseRemoteNodeParsingError() + RemoteAddress(hostname, port) + } + } + + Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(remoteAddresses))) case None ⇒ // check for 'cluster' config section diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index be5d9fcb5a..b7b7ffa6e8 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -79,9 +79,7 @@ object DeploymentConfig { preferredNodes: Iterable[Home] = Vector(Node(Config.nodename)), replication: ReplicationScheme = Transient) extends Scope - case class RemoteScope( - hostname: String = "localhost", - port: Int = 2552) extends Scope + case class RemoteScope(nodes: Iterable[RemoteAddress]) extends Scope // For Java API case class LocalScope() extends Scope @@ -89,6 +87,8 @@ object DeploymentConfig { // For Scala API case object LocalScope extends Scope + case class RemoteAddress(hostname: String, port: Int) + // -------------------------------- // --- Home // -------------------------------- diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 826721f7b6..c51fbabc91 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -5,6 +5,7 @@ package akka.remote import akka.actor._ +import akka.routing._ import DeploymentConfig._ import Actor._ import Status._ @@ -44,18 +45,58 @@ class RemoteActorRefProvider extends ActorRefProvider { if (oldFuture eq null) { // we won the race -- create the actor and resolve the future val actor = try { Deployer.lookupDeploymentFor(address) match { - case Some(Deploy(_, _, router, nrOfInstances, _, RemoteScope(host, port))) ⇒ - // FIXME create RoutedActorRef if 'router' is specified + case Some(Deploy(_, _, router, nrOfInstances, _, RemoteScope(remoteAddresses))) ⇒ - val serverAddress = Remote.address - if (serverAddress.getHostName == host && serverAddress.getPort == port) { - // home node for this remote actor + val thisHostname = Remote.address.getHostName + val thisPort = Remote.address.getPort + + def isReplicaNode: Boolean = remoteAddresses exists { remoteAddress ⇒ + remoteAddress.hostname == thisHostname && remoteAddress.port == thisPort + } + + if (isReplicaNode) { + // we are on one of the replica node for this remote actor Some(new LocalActorRef(props, address, false)) // create a local actor } else { - // not home node, need to provision it - val remoteAddress = new InetSocketAddress(host, port) - useActorOnNode(remoteAddress, address, props.creator) - Some(RemoteActorRef(remoteAddress, address, Actor.TIMEOUT, None)) // create a remote actor + + // we are on the single "reference" node uses the remote actors on the replica nodes + val routerType = DeploymentConfig.routerTypeFor(router) + val routerFactory: () ⇒ Router = routerType match { + case RouterType.Direct ⇒ + if (remoteAddresses.size != 1) throw new ConfigurationException( + "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new DirectRouter + + case RouterType.Random ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new RandomRouter + + case RouterType.RoundRobin ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new RoundRobinRouter + + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom ⇒ sys.error("Router Custom not supported yet") + } + + def provisionActorToNode(remoteAddress: RemoteAddress): RemoteActorRef = { + val inetSocketAddress = new InetSocketAddress(remoteAddress.hostname, remoteAddress.port) + useActorOnNode(inetSocketAddress, address, props.creator) + RemoteActorRef(inetSocketAddress, address, Actor.TIMEOUT, None) + } + + val connections: Iterable[ActorRef] = remoteAddresses map { provisionActorToNode(_) } + + Some(Routing.actorOf(RoutedProps( + routerFactory = routerFactory, + connections = connections))) } case deploy ⇒ None // non-remote actor diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf index fe1bf0b95d..e57dcbd806 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode1.conf @@ -1,4 +1,3 @@ akka.enabled-modules = ["remote"] akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.remote.hostname = "localhost" -akka.actor.deployment.service-hello.remote.port = 9991 +akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"] diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf index fe1bf0b95d..e57dcbd806 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmNode2.conf @@ -1,4 +1,3 @@ akka.enabled-modules = ["remote"] akka.event-handler-level = "WARNING" -akka.actor.deployment.service-hello.remote.hostname = "localhost" -akka.actor.deployment.service-hello.remote.port = 9991 +akka.actor.deployment.service-hello.remote.nodes = ["localhost:9991"] diff --git a/config/akka-reference.conf b/config/akka-reference.conf index aeb50ff719..b8c19c22ed 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -94,8 +94,10 @@ akka { #} remote { - hostname = "localhost" # The remote server hostname or IP address the remote actor should connect to - port = 2552 # The remote server port the remote actor should connect to + nodes = ["wallace:2552", "gromit:2552"] # A list of hostnames and ports for instantiating the remote actor instances + # The format should be on "hostname:port", where: + # - hostname can be either hostname or IP address the remote actor should connect to + # - port should be the port for the remote server on the other node } #cluster { # defines the actor as a clustered actor