diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index fb1c0a4637..d4d745ae8f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -1,7 +1,5 @@ package akka.routing -import akka.routing._ -import akka.config.ConfigurationException import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import collection.mutable.LinkedList @@ -17,6 +15,7 @@ object RoutingSpec { println("Hello") } } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -26,30 +25,25 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { import akka.routing.RoutingSpec._ - // TODO (HE) : Update test with new routing functionality - /* - "direct router" must { + "no router" must { "be started when constructed" in { - val actor1 = system.actorOf[TestActor] - - val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(actor1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - actor.isTerminated must be(false) + val routedActor = system.actorOf(Props(new TestActor).withRouting(NoRouter)) + routedActor.isTerminated must be(false) } "send message to connection" in { val doneLatch = new CountDownLatch(1) val counter = new AtomicInteger(0) - val connection1 = system.actorOf(new Actor { + + class Actor1 extends Actor { def receive = { case "end" ⇒ doneLatch.countDown() case _ ⇒ counter.incrementAndGet } - }) + } - val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val routedActor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new Actor1).withRouting(NoRouter)) routedActor ! "hello" routedActor ! "end" @@ -57,38 +51,12 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter.get must be(1) } - - "deliver a broadcast message" in { - val doneLatch = new CountDownLatch(1) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter1.addAndGet(msg) - } - }) - - val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor ! Broadcast(1) - actor ! "end" - - doneLatch.await(5, TimeUnit.SECONDS) must be(true) - - counter1.get must be(1) - } } "round robin router" must { - "be started when constructed" in { - val actor1 = system.actorOf[TestActor] - - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(actor1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - actor.isTerminated must be(false) + val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) } //In this test a bunch of actors are created and each actor has its own counter. @@ -101,32 +69,30 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val doneLatch = new CountDownLatch(connectionCount) //lets create some connections. - var connections = new LinkedList[ActorRef] + var actors = new LinkedList[ActorRef] var counters = new LinkedList[AtomicInteger] for (i ← 0 until connectionCount) { counters = counters :+ new AtomicInteger() - val connection = system.actorOf(new Actor { + val actor = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counters.get(i).get.addAndGet(msg) } }) - connections = connections :+ connection + actors = actors :+ actor } - //create the routed actor. - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(connections)) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = actors))) //send messages to the actor. for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { - actor ! (k + 1) + routedActor ! (k + 1) } } - actor ! Broadcast("end") + routedActor ! Broadcast("end") //now wait some and do validations. doneLatch.await(5, TimeUnit.SECONDS) must be(true) @@ -140,7 +106,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val doneLatch = new CountDownLatch(2) val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { + val actor1 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter1.addAndGet(msg) @@ -148,18 +114,17 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { }) val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { + val actor2 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter2.addAndGet(msg) } }) - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = List(actor1, actor2)))) - actor ! Broadcast(1) - actor ! Broadcast("end") + routedActor ! Broadcast(1) + routedActor ! Broadcast("end") doneLatch.await(5, TimeUnit.SECONDS) must be(true) @@ -167,6 +132,8 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter2.get must be(1) } + // TODO (HE) : Is this still a valid test case? + /* "fail to deliver a broadcast message using the ?" in { val doneLatch = new CountDownLatch(1) @@ -178,33 +145,32 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { } }) - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = List(connection1)))) - intercept[RoutingException] { actor ? Broadcast(1) } + intercept[RoutingException] { + routedActor ? Broadcast(1) + } - actor ! "end" + routedActor ! "end" doneLatch.await(5, TimeUnit.SECONDS) must be(true) counter1.get must be(0) } + */ + } "random router" must { "be started when constructed" in { - - val actor1 = system.actorOf[TestActor] - - val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(actor1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - actor.isTerminated must be(false) + val routedActor = system.actorOf(Props(new TestActor).withRouting(RandomRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) } "deliver a broadcast message" in { val doneLatch = new CountDownLatch(2) val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { + val actor1 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter1.addAndGet(msg) @@ -212,18 +178,17 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { }) val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { + val actor2 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter2.addAndGet(msg) } }) - val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new TestActor).withRouting(RandomRouter(targets = List(actor1, actor2)))) - actor ! Broadcast(1) - actor ! Broadcast("end") + routedActor ! Broadcast(1) + routedActor ! Broadcast("end") doneLatch.await(5, TimeUnit.SECONDS) must be(true) @@ -231,49 +196,118 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter2.get must be(1) } - "fail to deliver a broadcast message using the ?" in { - val doneLatch = new CountDownLatch(1) + // TODO (HE) : Is this still a valid test case? + /* + "fail to deliver a broadcast message using the ?" in { + val doneLatch = new CountDownLatch(1) + + val counter1 = new AtomicInteger + val connection1 = system.actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case _ ⇒ counter1.incrementAndGet() + } + }) + + val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1))) + val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + + try { + actor ? Broadcast(1) + fail() + } catch { + case e: RoutingException ⇒ + } + + actor ! "end" + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + counter1.get must be(0) + } + */ + } + + "broadcast router" must { + "be started when constructed" in { + val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) + } + + "broadcast message using !" in { + val doneLatch = new CountDownLatch(2) val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { + val actor1 = system.actorOf(new Actor { def receive = { - case "end" ⇒ doneLatch.countDown() - case _ ⇒ counter1.incrementAndGet() + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter1.addAndGet(msg) } }) - val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + val counter2 = new AtomicInteger + val actor2 = system.actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter2.addAndGet(msg) + } + }) - try { - actor ? Broadcast(1) - fail() - } catch { - case e: RoutingException ⇒ - } + val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(targets = List(actor1, actor2)))) + routedActor ! 1 + routedActor ! "end" - actor ! "end" doneLatch.await(5, TimeUnit.SECONDS) must be(true) - counter1.get must be(0) + + counter1.get must be(1) + counter2.get must be(1) + } + + "broadcast message using ?" in { + val doneLatch = new CountDownLatch(2) + + val counter1 = new AtomicInteger + val actor1 = system.actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ + counter1.addAndGet(msg) + sender ! "ack" + } + }) + + val counter2 = new AtomicInteger + val actor2 = system.actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter2.addAndGet(msg) + } + }) + + val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(targets = List(actor1, actor2)))) + routedActor ? 1 + routedActor ! "end" + + doneLatch.await(5, TimeUnit.SECONDS) must be(true) + + counter1.get must be(1) + counter2.get must be(1) } } - "Scatter-gather router" must { + // TODO (HE) : add tests below + /* - "return response, even if one of the connections has stopped" in { +"Scatter-gather router" must { - val shutdownLatch = new TestLatch(1) + "return response, even if one of the actors has stopped" in { + val shutdownLatch = new TestLatch(1) + val actor1 = newActor(1, Some(shutdownLatch)) + val actor2 = newActor(2, Some(shutdownLatch)) + val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2)))) - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor ! Broadcast(Stop(Some(0))) - - shutdownLatch.await - - (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) - } + routedActor ! Broadcast(Stop(Some(1))) + shutdownLatch.await + (routedActor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) + } "throw an exception, if all the connections have stopped" in { @@ -330,7 +364,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val connection = system.actorOf(new Actor { def receive = { - case "end" ⇒ doneLatch.countDown() + case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counters.get(i).get.addAndGet(msg) } }) @@ -363,7 +397,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val counter1 = new AtomicInteger val connection1 = system.actorOf(new Actor { def receive = { - case "end" ⇒ doneLatch.countDown() + case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter1.addAndGet(msg) } }) @@ -371,7 +405,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val counter2 = new AtomicInteger val connection2 = system.actorOf(new Actor { def receive = { - case "end" ⇒ doneLatch.countDown() + case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter2.addAndGet(msg) } }) @@ -389,95 +423,31 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter2.get must be(1) } - case class Stop(id: Option[Int] = None) - def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(new Actor { - def receive = { - case Stop(None) ⇒ self.stop() - case Stop(Some(_id)) if (_id == id) ⇒ self.stop() - case _id: Int if (_id == id) ⇒ - case _ ⇒ Thread sleep 100 * id; sender.tell(id) + case class Stop(id: Option[Int] = None) + + def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(new Actor { + def receive = { + case Stop(None) ⇒ + println(">>>> STOPPING : " + id) + self.stop() + case Stop(Some(_id)) if (_id == id) ⇒ + println(">>>> STOPPING >: " + id) + self.stop() + case _id: Int if (_id == id) ⇒ + println("-----> ID MATCH - do nothing") + case x ⇒ { + Thread sleep 100 * id + println("-----> SENDING REPLY: " + id) + sender.tell(id) } - - override def postStop = { - shudownLatch foreach (_.countDown()) - } - }) - } - - "broadcast router" must { - - "be started when constructed" in { - val actor1 = system.actorOf[TestActor] - - val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(actor1))) - val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo") - actor.isTerminated must be(false) } - "broadcast message using !" in { - val doneLatch = new CountDownLatch(2) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter1.addAndGet(msg) - } - }) - - val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter2.addAndGet(msg) - } - }) - - val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo") - - actor ! 1 - actor ! "end" - - doneLatch.await(5, TimeUnit.SECONDS) must be(true) - - counter1.get must be(1) - counter2.get must be(1) + override def postStop = { + println("***** POSTSTOP") + shudownLatch foreach (_.countDown()) } - - "broadcast message using ?" in { - val doneLatch = new CountDownLatch(2) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ - counter1.addAndGet(msg) - sender ! "ack" - } - }) - - val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter2.addAndGet(msg) - } - }) - - val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo") - - actor ? 1 - actor ! "end" - - doneLatch.await(5, TimeUnit.SECONDS) must be(true) - - counter1.get must be(1) - counter2.get must be(1) - } - } - */ + }) +} +*/ } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 3df8006f6f..9f0a31735d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -520,17 +520,14 @@ class LocalActorRefProvider( val path = supervisor.path / name props.routerConfig match { - case NoRouting ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor + case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor case routedActor ⇒ new RoutedActorRef(system, props.withRouting(adaptFromDeploy(routedActor, path)), supervisor, path) } } private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = { val lookupPath = p.elements.mkString("/", "/", "") - println("**** LOOKUP PATH : " + lookupPath) - val deploy = deployer.lookup(lookupPath) - println("**** " + deploy) - r.adaptFromDeploy(deploy) + r.adaptFromDeploy(deployer.lookup(lookupPath)) } def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 62fa4d76fe..6b30ee351a 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -156,7 +156,7 @@ object DeploymentConfig { } def routerTypeFor(routing: Routing): RouterType = routing match { - case _: NoRouting | NoRouting ⇒ RouterType.NoRouting + case _: NoRouting | NoRouting ⇒ RouterType.NoRouter case _: RoundRobin | RoundRobin ⇒ RouterType.RoundRobin case _: Random | Random ⇒ RouterType.Random case _: ScatterGather | ScatterGather ⇒ RouterType.ScatterGather diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 6ea19bca6e..7425db85bb 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -8,7 +8,7 @@ import akka.dispatch._ import akka.japi.Creator import akka.util._ import collection.immutable.Stack -import akka.routing.{ NoRouting, RouterConfig, RoutedProps } +import akka.routing.{ NoRouter, RouterConfig } /** * ActorRef configuration object, this is threadsafe and fully sharable @@ -29,7 +29,7 @@ object Props { case _ ⇒ Escalate } - final val defaultRoutedProps: RouterConfig = NoRouting + final val defaultRoutedProps: RouterConfig = NoRouter final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index dd94253d57..f30f3b3257 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -7,12 +7,15 @@ package akka.routing import akka.actor._ import akka.japi.Creator -import akka.util.ReflectiveAccess import java.lang.reflect.InvocationTargetException import akka.config.ConfigurationException import akka.routing.Routing.Broadcast import akka.actor.DeploymentConfig.Deploy import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.Future +import akka.util.{ Duration, ReflectiveAccess } +import java.util.concurrent.TimeUnit +import akka.AkkaException sealed trait RouterType @@ -26,7 +29,7 @@ object RouterType { /** * A RouterType that indicates no routing - i.e. direct message. */ - object NoRouting extends RouterType + object NoRouter extends RouterType /** * A RouterType that randomly selects a connection to send a message to. @@ -70,6 +73,11 @@ object RouterType { } +/** + * An {@link AkkaException} thrown when something goes wrong while routing a message + */ +class RoutingException(message: String) extends AkkaException(message) + /** * Contains the configuration to create local and clustered routed actor references. * Routed ActorRef configuration object, this is thread safe and fully sharable. @@ -84,51 +92,9 @@ case class RoutedProps private[akka] ( } } -///** -// * The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the -// * {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}. -// * -// * @author Jonas Bonér -// */ -//trait Router { -// -// /** -// * Initializes this Router with a given set of Connections. The Router can use this datastructure to ask for -// * the current connections, signal that there were problems with one of the connections and see if there have -// * been changes in the connections. -// * -// * This method is not threadsafe, and should only be called once -// * -// * JMM Guarantees: -// * This method guarantees that all changes made in this method, are visible before one of the routing methods is called. -// */ -// def init(connectionManager: ConnectionManager) -// -// /** -// * Routes the message to one of the connections. -// * -// * @throws RoutingException if something goes wrong while routing the message -// */ -// def route(message: Any)(implicit sender: ActorRef) -// -// /** -// * Routes the message using a timeout to one of the connections and returns a Future to synchronize on the -// * completion of the processing of the message. -// * -// * @throws RoutingExceptionif something goes wrong while routing the message. -// */ -// def route[T](message: Any, timeout: Timeout): Future[T] -//} -// -///** -// * An {@link AkkaException} thrown when something goes wrong while routing a message -// */ -//class RoutingException(message: String) extends AkkaException(message) -// - /** - * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to - * on (or more) of these actors. + * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to + * send a message to on (or more) of these actors. */ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath) extends LocalActorRef( @@ -141,39 +107,33 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup override def !(message: Any)(implicit sender: ActorRef = null) { route(message) match { - case null ⇒ super.!(message)(sender) - case ref: ActorRef ⇒ ref.!(message)(sender) - case refs: Traversable[ActorRef] ⇒ refs foreach (_.!(message)(sender)) + case null ⇒ super.!(message)(sender) + case ref: ActorRef ⇒ ref.!(message)(sender) + case refs: Traversable[ActorRef] ⇒ + message match { + case Broadcast(m) ⇒ refs foreach (_.!(m)(sender)) + case _ ⇒ refs foreach (_.!(message)(sender)) + } } } + + // TODO (HE) : Should the RoutedActorRef also override "?"? + // If not how then Broadcast messages cannot be sent via ? - + // which it is in some test cases at the moment. } trait RouterConfig extends Function0[Actor] { def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig + def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Routing.Route } /** - * Routing configuration that indicates no routing. - * Oxymoron style. - */ -case object NoRouting extends RouterConfig { - - def adaptFromDeploy(deploy: Option[Deploy]) = null - - def createRoute(creator: () ⇒ Actor, actorContext: ActorContext) = null - - def apply(): Actor = null -} - -/** - * The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the - * {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}. + * A Router is responsible for sending a message to one (or more) of its connections. * * @author Jonas Bonér */ trait Router { - // TODO (HE): implement failure detection } /** @@ -206,6 +166,18 @@ object Routing { type Route = (Any) ⇒ AnyRef } +/** + * Routing configuration that indicates no routing. + * Oxymoron style. + */ +case object NoRouter extends RouterConfig { + def adaptFromDeploy(deploy: Option[Deploy]) = null + + def createRoute(creator: () ⇒ Actor, actorContext: ActorContext) = null + + def apply(): Actor = null +} + /** * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. *
@@ -222,8 +194,10 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { deploy match { - case Some(d) ⇒ copy(nrOfInstances = d.nrOfInstances.factor) - case _ ⇒ this + case Some(d) ⇒ + // In case there is a config then use this over any programmed settings. + copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) + case _ ⇒ this } } @@ -236,12 +210,8 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { val routees: Vector[ActorRef] = (nrOfInstances, targets) match { case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") - case (x, Nil) ⇒ - println("----> 0, Nil") - (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouting)))(scala.collection.breakOut) - case (x, xs) ⇒ - println("----> x, xs") - Vector.empty[ActorRef] ++ xs + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) + case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs } val next = new AtomicInteger(0) @@ -251,292 +221,164 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] } { - case _: AutoReceivedMessage ⇒ null //TODO: handle system specific messages - case Broadcast(msg) ⇒ routees - case msg ⇒ getNext() + case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled? + case Broadcast(msg) ⇒ routees + case msg ⇒ getNext() } } - - /* - private val state = new AtomicReference[RoundRobinState] - - def next: Option[ActorRef] = currentState.next - - @tailrec - private def currentState: RoundRobinState = { - val current = state.get - - if (current != null && current.version == connectionManager.version) { - //we are lucky, since there has not been any change in the connections. So therefor we can use the existing state. - current - } else { - //there has been a change in connections, or it was the first try, so we need to update the internal state - - val connections = connectionManager.connections - val newState = new RoundRobinState(connections.iterable.toIndexedSeq[ActorRef], connections.version) - if (state.compareAndSet(current, newState)) - //we are lucky since we just updated the state, so we can send it back as the state to use - newState - else //we failed to update the state, lets try again... better luck next time. - currentState - } - } - - private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) { - - private val index = new AtomicInteger(0) - - def next: Option[ActorRef] = if (array.isEmpty) None else Some(array(nextIndex)) - - @tailrec - private def nextIndex: Int = { - val oldIndex = index.get - var newIndex = if (oldIndex == array.length - 1) 0 else oldIndex + 1 - - if (!index.compareAndSet(oldIndex, newIndex)) nextIndex - else oldIndex - } - } - */ } -///** -// * An Abstract Router implementation that already provides the basic infrastructure so that a concrete -// * Router only needs to implement the next method. -// */ -//trait BasicRouter extends Router { -// -// @volatile -// protected var connectionManager: ConnectionManager = _ -// -// def init(connectionManager: ConnectionManager) = { -// this.connectionManager = connectionManager -// } -// -// def route(message: Any)(implicit sender: ActorRef) = message match { -// case Routing.Broadcast(message) ⇒ -// -// //it is a broadcast message, we are going to send to message to all connections. -// connectionManager.connections.iterable foreach { -// connection ⇒ -// try { -// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' -// } catch { -// case e: Exception ⇒ -// connectionManager.remove(connection) -// throw e -// } -// } -// case _ ⇒ -// //it no broadcast message, we are going to select an actor from the connections and send the message to him. -// next match { -// case Some(connection) ⇒ -// try { -// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' -// } catch { -// case e: Exception ⇒ -// connectionManager.remove(connection) -// throw e -// } -// case None ⇒ -// throwNoConnectionsError -// } -// } -// -// def route[T](message: Any, timeout: Timeout): Future[T] = message match { -// case Routing.Broadcast(message) ⇒ -// throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.") -// case _ ⇒ -// //it no broadcast message, we are going to select an actor from the connections and send the message to him. -// next match { -// case Some(connection) ⇒ -// try { -// connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it?? -// } catch { -// case e: Exception ⇒ -// connectionManager.remove(connection) -// throw e -// } -// case None ⇒ -// throwNoConnectionsError -// } -// } -// -// protected def next: Option[ActorRef] -// -// private def throwNoConnectionsError = throw new RoutingException("No replica connections for router") -//} -// -///** -// * A Router that uses broadcasts a message to all its connections. -// */ -//class BroadcastRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter with Serializable { -// override def route(message: Any)(implicit sender: ActorRef) = { -// connectionManager.connections.iterable foreach { -// connection ⇒ -// try { -// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' -// } catch { -// case e: Exception ⇒ -// connectionManager.remove(connection) -// throw e -// } -// } -// } -// -// //protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = -// override def route[T](message: Any, timeout: Timeout): Future[T] = { -// import Future._ -// implicit val t = timeout -// val futures = connectionManager.connections.iterable map { -// connection ⇒ -// connection.?(message, timeout).asInstanceOf[Future[T]] -// } -// Future.firstCompletedOf(futures) -// } -// -// protected def next: Option[ActorRef] = None -//} -// -///** -// * A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef. -// * -// * @author Jonas Bonér -// */ -//class DirectRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter { -// -// private val state = new AtomicReference[DirectRouterState] -// -// lazy val next: Option[ActorRef] = { -// val current = currentState -// if (current.ref == null) None else Some(current.ref) -// } -// -// @tailrec -// private def currentState: DirectRouterState = { -// val current = state.get -// -// if (current != null && connectionManager.version == current.version) { -// //we are lucky since nothing has changed in the connections. -// current -// } else { -// //there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating. -// -// val connections = connectionManager.connections -// -// val connectionCount = connections.iterable.size -// if (connectionCount > 1) -// throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount)) -// -// val newState = new DirectRouterState(connections.iterable.head, connections.version) -// if (state.compareAndSet(current, newState)) -// //we are lucky since we just updated the state, so we can send it back as the state to use -// newState -// else //we failed to update the state, lets try again... better luck next time. -// currentState // recur -// } -// } -// -// private case class DirectRouterState(ref: ActorRef, version: Long) -// -//} -// -///** -// * A Router that randomly selects one of the target connections to send a message to. -// * -// * @author Jonas Bonér -// */ -//class RandomRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter { -// -// import java.security.SecureRandom -// -// private val state = new AtomicReference[RandomRouterState] -// -// private val random = new ThreadLocal[SecureRandom] { -// override def initialValue = SecureRandom.getInstance("SHA1PRNG") -// } -// -// def next: Option[ActorRef] = currentState.array match { -// case a if a.isEmpty ⇒ None -// case a ⇒ Some(a(random.get.nextInt(a.length))) -// } -// -// @tailrec -// private def currentState: RandomRouterState = { -// val current = state.get -// -// if (current != null && current.version == connectionManager.version) { -// //we are lucky, since there has not been any change in the connections. So therefor we can use the existing state. -// current -// } else { -// //there has been a change in connections, or it was the first try, so we need to update the internal state -// -// val connections = connectionManager.connections -// val newState = new RandomRouterState(connections.iterable.toIndexedSeq, connections.version) -// if (state.compareAndSet(current, newState)) -// //we are lucky since we just updated the state, so we can send it back as the state to use -// newState -// else //we failed to update the state, lets try again... better luck next time. -// currentState -// } -// } -// -// private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long) -//} +/** + * A Router that randomly selects one of the target connections to send a message to. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) + extends Router with RouterConfig { + + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { + deploy match { + case Some(d) ⇒ + // In case there is a config then use this over any programmed settings. + copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) + case _ ⇒ this + } + } + + def apply(): Actor = new Actor { + def receive = { + case _ ⇒ + } + } + + import java.security.SecureRandom + + private val random = new ThreadLocal[SecureRandom] { + override def initialValue = SecureRandom.getInstance("SHA1PRNG") + } + + def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + val routees: Vector[ActorRef] = (nrOfInstances, targets) match { + case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) + case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs + } + + def getNext(): ActorRef = { + routees(random.get.nextInt(routees.size)) + } + + { + case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled? + case Broadcast(msg) ⇒ routees + case msg ⇒ getNext() + } + } +} /** - * ScatterGatherRouter broadcasts the message to all connections and gathers results according to the - * specified strategy (specific router needs to implement `gather` method). - * Scatter-gather pattern will be applied only to the messages broadcasted using Future - * (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget - * mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type - * - * FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected. - * FIXME: this is also the location where message buffering should be done in case of failure. + * A Router that uses broadcasts a message to all its connections. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -/* -trait ScatterGatherRouter extends BasicRouter with Serializable { +case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) + extends Router with RouterConfig { - /** - * Aggregates the responses into a single Future. - * - * @param results Futures of the responses from connections - */ - protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { + deploy match { + case Some(d) ⇒ + // In case there is a config then use this over any programmed settings. + copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) + case _ ⇒ this + } + } - private def scatterGather[S, G >: S](message: Any, timeout: Timeout): Future[G] = { - val responses = connectionManager.connections.iterable.flatMap { - actor ⇒ + def apply(): Actor = new Actor { + def receive = { + case _ ⇒ + } + } + + def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + val routees: Vector[ActorRef] = (nrOfInstances, targets) match { + case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) + case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs + } + + { + case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled? + case Broadcast(msg) ⇒ routees + case msg ⇒ routees + } + } +} + +// TODO (HE) : Correct description below +/** + * Simple router that broadcasts the message to all connections, and replies with the first response. + * Scatter-gather pattern will be applied only to the messages broadcast using Future + * (wrapped into {@link Routing.Broadcast} and sent with "?" method). + * For the messages sent in a fire-forget mode, the router would behave as {@link RoundRobinRouter} + */ +case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig { + + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { + deploy match { + case Some(d) ⇒ + // In case there is a config then use this over any programmed settings. + copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) + case _ ⇒ this + } + } + + def apply(): Actor = new Actor { + def receive = { + case _ ⇒ + } + } + + def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + val routees: Vector[ActorRef] = (nrOfInstances, targets) match { + case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) + case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs + } + + def scatterGather[S, G >: S](message: Any, t: Timeout): Future[G] = { + val responses = routees.flatMap { actor ⇒ try { - if (actor.isTerminated) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace - Some(actor.?(message, timeout).asInstanceOf[Future[S]]) + if (actor.isTerminated) None else Some(actor.?(message, t).asInstanceOf[Future[S]]) } catch { - case e: Exception ⇒ - connectionManager.remove(actor) - None + case e: Exception ⇒ None } + } + + if (!responses.isEmpty) throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message)) + else { + implicit val messageDispatcher = context.dispatcher + implicit val timeout = t + Future.firstCompletedOf(responses) + } } - if (responses.isEmpty) - throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message)) - else gather(responses) - } - - override def route[T](message: Any, timeout: Timeout): Future[T] = message match { - case Routing.Broadcast(message) ⇒ scatterGather(message, timeout) - case message ⇒ super.route(message, timeout) + // TODO (HE) : Timeout and Future should be updated to new strategy - or hardcoded value below should at least be removed! + { + case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled? + case Broadcast(msg) ⇒ routees + case msg ⇒ scatterGather(msg, Timeout(Duration(5000, TimeUnit.MILLISECONDS))) + } } } -*/ - -/** - * Simple router that broadcasts the message to all connections, and replies with the first response - * Scatter-gather pattern will be applied only to the messages broadcasted using Future - * (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget - * mode, the router would behave as {@link RoundRobinRouter} - */ -/* -class ScatterGatherFirstCompletedRouter(implicit dispatcher: MessageDispatcher, timeout: Timeout) extends RoundRobinRouter with ScatterGatherRouter { - protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results) -} -*/ diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 49a3a2ca14..f0bdc36547 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -6,12 +6,12 @@ package akka.tutorial.first.scala import java.util.concurrent.CountDownLatch import akka.actor._ import akka.routing._ -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{ ConfigFactory, Config } object Pi extends App { // Initiate the calculation - calculate(nrOfWorkers = 4, nrOfElements = 10, nrOfMessages = 10) + calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) // ==================== // ===== Messages ===== @@ -39,7 +39,6 @@ object Pi extends App { def receive = { case Work(start, nrOfElements) ⇒ - println("*** RECEIVED MESSAGE IN: " + self.path) sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work } } @@ -55,26 +54,11 @@ object Pi extends App { var start: Long = _ // create the workers + val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker]) - var workers = Vector.empty[ActorRef] - for (i ← 1 to 2) { - workers = context.actorOf[Worker] +: workers - } - - // TODO (HE) : use this way of creating actors - //val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker]) - - /* - // wrap them with a load-balancing router - // FIXME routers are intended to be used like this - implicit val timout = context.system.settings.ActorTimeout - implicit val dispatcher = context.dispatcher - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers)) - val router = new RoutedActorRef(context.system, props, self.asInstanceOf[InternalActorRef], "pi") - */ - - //val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi") - val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi") + // create a round robin router for the workers + val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi") + //val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi") // message handler def receive = { @@ -107,7 +91,7 @@ object Pi extends App { // ===== Run it ===== // ================== def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - val system = ActorSystem() + val system = ActorSystem("PiSystem", ConfigFactory.load("akka.conf")) // this latch is only plumbing to know when the calculation is completed val latch = new CountDownLatch(1)