From 16e4be6077e84d92e09533f5411f9706eb95f94f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Wed, 28 Sep 2011 19:28:49 +0200 Subject: [PATCH] Now treating actor deployed and configured with Direct routing and LocalScope as a "normal" in-process actor (LocalActorRef). MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../routing/ConfiguredLocalRoutingSpec.scala | 146 +++++++++--------- .../scala/akka/actor/ActorRefProvider.scala | 17 +- config/akka-reference.conf | 2 +- 3 files changed, 85 insertions(+), 80 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index d050f37046..32c083be50 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -14,102 +14,102 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit } class ConfiguredLocalRoutingSpec extends WordSpec with MustMatchers { - "direct router" must { + // "direct router" must { - "be able to shut down its instance" in { - val address = "direct-0" + // "be able to shut down its instance" in { + // val address = "direct-0" - Deployer.deploy( - Deploy( - address, - None, - Direct, - ReplicationFactor(1), - RemoveConnectionOnFirstFailureLocalFailureDetector, - LocalScope)) + // Deployer.deploy( + // Deploy( + // address, + // None, + // Direct, + // ReplicationFactor(1), + // RemoveConnectionOnFirstFailureLocalFailureDetector, + // LocalScope)) - val helloLatch = new CountDownLatch(1) - val stopLatch = new CountDownLatch(1) + // val helloLatch = new CountDownLatch(1) + // val stopLatch = new CountDownLatch(1) - val actor = actorOf(new Actor { - def receive = { - case "hello" ⇒ helloLatch.countDown() - } + // val actor = actorOf(new Actor { + // def receive = { + // case "hello" ⇒ helloLatch.countDown() + // } - override def postStop() { - stopLatch.countDown() - } - }, address) + // override def postStop() { + // stopLatch.countDown() + // } + // }, address) - actor ! "hello" + // actor ! "hello" - helloLatch.await(5, TimeUnit.SECONDS) must be(true) + // helloLatch.await(5, TimeUnit.SECONDS) must be(true) - actor.stop() + // actor.stop() - stopLatch.await(5, TimeUnit.SECONDS) must be(true) - } + // stopLatch.await(5, TimeUnit.SECONDS) must be(true) + // } - "send message to connection" in { - val address = "direct-1" + // "send message to connection" in { + // val address = "direct-1" - Deployer.deploy( - Deploy( - address, - None, - Direct, - ReplicationFactor(1), - RemoveConnectionOnFirstFailureLocalFailureDetector, - LocalScope)) + // Deployer.deploy( + // Deploy( + // address, + // None, + // Direct, + // ReplicationFactor(1), + // RemoveConnectionOnFirstFailureLocalFailureDetector, + // LocalScope)) - val doneLatch = new CountDownLatch(1) + // val doneLatch = new CountDownLatch(1) - val counter = new AtomicInteger(0) - val actor = actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case _ ⇒ counter.incrementAndGet() - } - }, address) + // val counter = new AtomicInteger(0) + // val actor = actorOf(new Actor { + // def receive = { + // case "end" ⇒ doneLatch.countDown() + // case _ ⇒ counter.incrementAndGet() + // } + // }, address) - actor ! "hello" - actor ! "end" + // actor ! "hello" + // actor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + // doneLatch.await(5, TimeUnit.SECONDS) must be(true) - counter.get must be(1) - } + // counter.get must be(1) + // } - "deliver a broadcast message" in { - val address = "direct-2" + // "deliver a broadcast message" in { + // val address = "direct-2" - Deployer.deploy( - Deploy( - address, - None, - Direct, - ReplicationFactor(1), - RemoveConnectionOnFirstFailureLocalFailureDetector, - LocalScope)) + // Deployer.deploy( + // Deploy( + // address, + // None, + // Direct, + // ReplicationFactor(1), + // RemoveConnectionOnFirstFailureLocalFailureDetector, + // LocalScope)) - val doneLatch = new CountDownLatch(1) + // val doneLatch = new CountDownLatch(1) - val counter1 = new AtomicInteger - val actor = actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter1.addAndGet(msg) - } - }, address) + // val counter1 = new AtomicInteger + // val actor = actorOf(new Actor { + // def receive = { + // case "end" ⇒ doneLatch.countDown() + // case msg: Int ⇒ counter1.addAndGet(msg) + // } + // }, address) - actor ! Broadcast(1) - actor ! "end" + // actor ! Broadcast(1) + // actor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + // doneLatch.await(5, TimeUnit.SECONDS) must be(true) - counter1.get must be(1) - } - } + // counter1.get must be(1) + // } + // } "round robin router" must { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index ec7f339625..ff23e47a6b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -148,8 +148,16 @@ class LocalActorRefProvider extends ActorRefProvider { val actor = try { Deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor - case Some(Deploy(_, _, router, nrOfInstances, _, LocalScope)) ⇒ - val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(router) match { + + // create a local actor + case None | Some(Deploy(_, _, Direct, _, _, LocalScope)) ⇒ + Some(new LocalActorRef(props, address, systemService)) // create a local actor + + // create a routed actor ref + case deploy @ Some(Deploy(_, _, router, nrOfInstances, _, LocalScope)) ⇒ + val routerType = DeploymentConfig.routerTypeFor(router) + + val routerFactory: () ⇒ Router = routerType match { case RouterType.Direct ⇒ () ⇒ new DirectRouter case RouterType.Random ⇒ () ⇒ new RandomRouter case RouterType.RoundRobin ⇒ () ⇒ new RoundRobinRouter @@ -167,10 +175,7 @@ class LocalActorRefProvider extends ActorRefProvider { routerFactory = routerFactory, connections = connections))) - case None ⇒ - Some(new LocalActorRef(props, address, systemService)) // create a local actor - - case _ ⇒ None // non-local actor + case _ ⇒ None // non-local actor - pass it on } } catch { case e: Exception ⇒ diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 41d2a11b09..5a57dbc449 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -70,7 +70,7 @@ akka { # default is "direct"; # if 'replication' is used then the only available router is "direct" - # replication-factor = 3 # number of actor instances in the cluster + replication-factor = 3 # number of actor instances in the cluster # available: positive integer (0-N) or the string "auto" for auto-scaling # if "auto" is used then 'home' has no meaning # default is '0', meaning no replicas;