diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 5437d0daa0..9002f3b044 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -5,7 +5,6 @@ package akka.actor import akka.testkit.AkkaSpec -import akka.util.duration._ import DeploymentConfig._ import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index fb1c0a4637..751092827e 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -1,7 +1,5 @@ package akka.routing -import akka.routing._ -import akka.config.ConfigurationException import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import collection.mutable.LinkedList @@ -17,6 +15,7 @@ object RoutingSpec { println("Hello") } } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -26,30 +25,25 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { import akka.routing.RoutingSpec._ - // TODO (HE) : Update test with new routing functionality - /* - "direct router" must { + "no router" must { "be started when constructed" in { - val actor1 = system.actorOf[TestActor] - - val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(actor1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - actor.isTerminated must be(false) + val routedActor = system.actorOf(Props(new TestActor).withRouting(NoRouter)) + routedActor.isTerminated must be(false) } "send message to connection" in { val doneLatch = new CountDownLatch(1) val counter = new AtomicInteger(0) - val connection1 = system.actorOf(new Actor { + + class Actor1 extends Actor { def receive = { case "end" ⇒ doneLatch.countDown() case _ ⇒ counter.incrementAndGet } - }) + } - val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val routedActor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new Actor1).withRouting(NoRouter)) routedActor ! "hello" routedActor ! "end" @@ -57,38 +51,12 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter.get must be(1) } - - "deliver a broadcast message" in { - val doneLatch = new CountDownLatch(1) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter1.addAndGet(msg) - } - }) - - val props = RoutedProps(routerFactory = () ⇒ new DirectRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor ! Broadcast(1) - actor ! "end" - - doneLatch.await(5, TimeUnit.SECONDS) must be(true) - - counter1.get must be(1) - } } "round robin router" must { - "be started when constructed" in { - val actor1 = system.actorOf[TestActor] - - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(actor1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - actor.isTerminated must be(false) + val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) } //In this test a bunch of actors are created and each actor has its own counter. @@ -101,32 +69,30 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val doneLatch = new CountDownLatch(connectionCount) //lets create some connections. - var connections = new LinkedList[ActorRef] + var actors = new LinkedList[ActorRef] var counters = new LinkedList[AtomicInteger] for (i ← 0 until connectionCount) { counters = counters :+ new AtomicInteger() - val connection = system.actorOf(new Actor { + val actor = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counters.get(i).get.addAndGet(msg) } }) - connections = connections :+ connection + actors = actors :+ actor } - //create the routed actor. - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(connections)) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = actors))) //send messages to the actor. for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { - actor ! (k + 1) + routedActor ! (k + 1) } } - actor ! Broadcast("end") + routedActor ! Broadcast("end") //now wait some and do validations. doneLatch.await(5, TimeUnit.SECONDS) must be(true) @@ -140,7 +106,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val doneLatch = new CountDownLatch(2) val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { + val actor1 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter1.addAndGet(msg) @@ -148,63 +114,37 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { }) val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { + val actor2 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter2.addAndGet(msg) } }) - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = List(actor1, actor2)))) - actor ! Broadcast(1) - actor ! Broadcast("end") + routedActor ! Broadcast(1) + routedActor ! Broadcast("end") doneLatch.await(5, TimeUnit.SECONDS) must be(true) counter1.get must be(1) counter2.get must be(1) } - - "fail to deliver a broadcast message using the ?" in { - val doneLatch = new CountDownLatch(1) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case _ ⇒ counter1.incrementAndGet() - } - }) - - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - intercept[RoutingException] { actor ? Broadcast(1) } - - actor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) - counter1.get must be(0) - } } "random router" must { "be started when constructed" in { - - val actor1 = system.actorOf[TestActor] - - val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(actor1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - actor.isTerminated must be(false) + val routedActor = system.actorOf(Props(new TestActor).withRouting(RandomRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) } "deliver a broadcast message" in { val doneLatch = new CountDownLatch(2) val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { + val actor1 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter1.addAndGet(msg) @@ -212,214 +152,36 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { }) val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { + val actor2 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter2.addAndGet(msg) } }) - val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") + val routedActor = system.actorOf(Props(new TestActor).withRouting(RandomRouter(targets = List(actor1, actor2)))) - actor ! Broadcast(1) - actor ! Broadcast("end") + routedActor ! Broadcast(1) + routedActor ! Broadcast("end") doneLatch.await(5, TimeUnit.SECONDS) must be(true) counter1.get must be(1) counter2.get must be(1) } - - "fail to deliver a broadcast message using the ?" in { - val doneLatch = new CountDownLatch(1) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case _ ⇒ counter1.incrementAndGet() - } - }) - - val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - try { - actor ? Broadcast(1) - fail() - } catch { - case e: RoutingException ⇒ - } - - actor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) - counter1.get must be(0) - } - } - - "Scatter-gather router" must { - - "return response, even if one of the connections has stopped" in { - - val shutdownLatch = new TestLatch(1) - - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor ! Broadcast(Stop(Some(0))) - - shutdownLatch.await - - (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) - } - - "throw an exception, if all the connections have stopped" in { - - val shutdownLatch = new TestLatch(2) - - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor ! Broadcast(Stop()) - - shutdownLatch.await - - (intercept[RoutingException] { - actor ? Broadcast(0) - }) must not be (null) - - } - - "return the first response from connections, when all of them replied" in { - - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1)))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) - - } - - "return the first response from connections, when some of them failed to reply" in { - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1)))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) - } - - "be started when constructed" in { - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0)))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor.isTerminated must be(false) - } - - "deliver one-way messages in a round robin fashion" in { - val connectionCount = 10 - val iterationCount = 10 - val doneLatch = new TestLatch(connectionCount) - - var connections = new LinkedList[ActorRef] - var counters = new LinkedList[AtomicInteger] - for (i ← 0 until connectionCount) { - counters = counters :+ new AtomicInteger() - - val connection = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counters.get(i).get.addAndGet(msg) - } - }) - connections = connections :+ connection - } - - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(connections)) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - for (i ← 0 until iterationCount) { - for (k ← 0 until connectionCount) { - actor ! (k + 1) - } - } - - actor ! Broadcast("end") - - doneLatch.await - - for (i ← 0 until connectionCount) { - val counter = counters.get(i).get - counter.get must be((iterationCount * (i + 1))) - } - } - - "deliver a broadcast message using the !" in { - val doneLatch = new TestLatch(2) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter1.addAndGet(msg) - } - }) - - val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counter2.addAndGet(msg) - } - }) - - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor ! Broadcast(1) - actor ! Broadcast("end") - - doneLatch.await - - counter1.get must be(1) - counter2.get must be(1) - } - - case class Stop(id: Option[Int] = None) - - def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(new Actor { - def receive = { - case Stop(None) ⇒ self.stop() - case Stop(Some(_id)) if (_id == id) ⇒ self.stop() - case _id: Int if (_id == id) ⇒ - case _ ⇒ Thread sleep 100 * id; sender.tell(id) - } - - override def postStop = { - shudownLatch foreach (_.countDown()) - } - }) } "broadcast router" must { - "be started when constructed" in { - val actor1 = system.actorOf[TestActor] - - val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(actor1))) - val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo") - actor.isTerminated must be(false) + val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(nrOfInstances = 1))) + routedActor.isTerminated must be(false) } "broadcast message using !" in { val doneLatch = new CountDownLatch(2) val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { + val actor1 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter1.addAndGet(msg) @@ -427,18 +189,16 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { }) val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { + val actor2 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter2.addAndGet(msg) } }) - val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo") - - actor ! 1 - actor ! "end" + val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(targets = List(actor1, actor2)))) + routedActor ! 1 + routedActor ! "end" doneLatch.await(5, TimeUnit.SECONDS) must be(true) @@ -450,7 +210,7 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { val doneLatch = new CountDownLatch(2) val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { + val actor1 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ @@ -460,18 +220,16 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { }) val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { + val actor2 = system.actorOf(new Actor { def receive = { case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter2.addAndGet(msg) } }) - val props = RoutedProps(routerFactory = () ⇒ new BroadcastRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - val actor = new RoutedActorRef(system, props, system.asInstanceOf[ActorSystemImpl].guardian, "foo") - - actor ? 1 - actor ! "end" + val routedActor = system.actorOf(Props(new TestActor).withRouting(BroadcastRouter(targets = List(actor1, actor2)))) + routedActor ? 1 + routedActor ! "end" doneLatch.await(5, TimeUnit.SECONDS) must be(true) @@ -479,5 +237,70 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter2.get must be(1) } } - */ + + "Scatter-gather router" must { + + "be started when constructed" in { + val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(newActor(0))))) + routedActor.isTerminated must be(false) + } + + "deliver a broadcast message using the !" in { + val doneLatch = new TestLatch(2) + + val counter1 = new AtomicInteger + val actor1 = system.actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter1.addAndGet(msg) + } + }) + + val counter2 = new AtomicInteger + val actor2 = system.actorOf(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter2.addAndGet(msg) + } + }) + + val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2)))) + routedActor ! Broadcast(1) + routedActor ! Broadcast("end") + + doneLatch.await + + counter1.get must be(1) + counter2.get must be(1) + } + + "return response, even if one of the actors has stopped" in { + val shutdownLatch = new TestLatch(1) + val actor1 = newActor(1, Some(shutdownLatch)) + val actor2 = newActor(22, Some(shutdownLatch)) + val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2)))) + + routedActor ! Broadcast(Stop(Some(1))) + shutdownLatch.await + (routedActor ? Broadcast(0)).as[Int].get must be(22) + } + + case class Stop(id: Option[Int] = None) + + def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor { + def receive = { + case Stop(None) ⇒ self.stop() + case Stop(Some(_id)) if (_id == id) ⇒ self.stop() + case _id: Int if (_id == id) ⇒ + case x ⇒ { + Thread sleep 100 * id + sender.tell(id) + } + } + + override def postStop = { + shudownLatch foreach (_.countDown()) + } + }), "Actor:" + id) + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 33e501a74b..43936eff12 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -295,7 +295,16 @@ private[akka] class LocalActorRef private[akka] ( def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender) - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = actorCell.provider.ask(message, this, timeout) + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { + actorCell.provider.ask(timeout) match { + case Some(a) ⇒ + this.!(message)(a) + a.result + case None ⇒ + this.!(message)(null) + new DefaultPromise[Any](0)(actorCell.system.dispatcher) + } + } def restart(cause: Throwable): Unit = actorCell.restart(cause) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 044b8e7d87..133f9e98bc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -5,21 +5,13 @@ package akka.actor import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{ ConcurrentHashMap, TimeUnit } -import scala.annotation.tailrec import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer } -import akka.actor.Timeout.intToTimeout -import akka.config.ConfigurationException import akka.dispatch._ import akka.routing._ import akka.AkkaException import akka.util.{ Duration, Switch, Helpers } -import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import akka.event._ -import akka.event.Logging.Error._ -import akka.event.Logging.Warning import java.io.Closeable -import com.typesafe.config.Config /** * Interface for all ActorRef providers to implement. @@ -105,9 +97,10 @@ trait ActorRefProvider { def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef /** - * Create AskActorRef to hook up message send to recipient with Future receiver. + * Create AskActorRef and register it properly so it can be serialized/deserialized; + * caller needs to send the message. */ - def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] + def ask(within: Timeout): Option[AskActorRef] /** * This Future is completed upon termination of this ActorRefProvider, which @@ -527,22 +520,20 @@ class LocalActorRefProvider( def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean): InternalActorRef = { props.routerConfig match { - case NoRouting ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor + case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor case routedActor ⇒ new RoutedActorRef(system, props.withRouting(adaptFromDeploy(routedActor, path)), supervisor, path) } } private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = { val lookupPath = p.elements.mkString("/", "/", "") - val deploy = deployer.lookup(lookupPath) - r.adaptFromDeploy(deploy) + r.adaptFromDeploy(deployer.lookup(lookupPath)) } - def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { - import akka.dispatch.DefaultPromise + def ask(within: Timeout): Option[AskActorRef] = { (if (within == null) settings.ActorTimeout else within) match { case t if t.duration.length <= 0 ⇒ - new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout + None case t ⇒ val path = tempPath() val name = path.name @@ -552,8 +543,7 @@ class LocalActorRefProvider( } } tempContainer.addChild(name, a) - recipient.tell(message, a) - a.result + Some(a) } } } diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 62fa4d76fe..6b30ee351a 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -156,7 +156,7 @@ object DeploymentConfig { } def routerTypeFor(routing: Routing): RouterType = routing match { - case _: NoRouting | NoRouting ⇒ RouterType.NoRouting + case _: NoRouting | NoRouting ⇒ RouterType.NoRouter case _: RoundRobin | RoundRobin ⇒ RouterType.RoundRobin case _: Random | Random ⇒ RouterType.Random case _: ScatterGather | ScatterGather ⇒ RouterType.ScatterGather diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 6ea19bca6e..7425db85bb 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -8,7 +8,7 @@ import akka.dispatch._ import akka.japi.Creator import akka.util._ import collection.immutable.Stack -import akka.routing.{ NoRouting, RouterConfig, RoutedProps } +import akka.routing.{ NoRouter, RouterConfig } /** * ActorRef configuration object, this is threadsafe and fully sharable @@ -29,7 +29,7 @@ object Props { case _ ⇒ Escalate } - final val defaultRoutedProps: RouterConfig = NoRouting + final val defaultRoutedProps: RouterConfig = NoRouter final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(defaultDecider, None, None) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index dd94253d57..034993bd22 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -7,12 +7,15 @@ package akka.routing import akka.actor._ import akka.japi.Creator -import akka.util.ReflectiveAccess import java.lang.reflect.InvocationTargetException import akka.config.ConfigurationException -import akka.routing.Routing.Broadcast import akka.actor.DeploymentConfig.Deploy import java.util.concurrent.atomic.AtomicInteger +import akka.util.ReflectiveAccess +import akka.AkkaException +import scala.collection.JavaConversions._ +import akka.routing.Routing.{ Destination, Broadcast } +import java.util.concurrent.TimeUnit sealed trait RouterType @@ -26,7 +29,7 @@ object RouterType { /** * A RouterType that indicates no routing - i.e. direct message. */ - object NoRouting extends RouterType + object NoRouter extends RouterType /** * A RouterType that randomly selects a connection to send a message to. @@ -38,16 +41,16 @@ object RouterType { */ object RoundRobin extends RouterType - /** - * A RouterType that selects the connection by using scatter gather. - */ - object ScatterGather extends RouterType - /** * A RouterType that broadcasts the messages to all connections. */ object Broadcast extends RouterType + /** + * A RouterType that selects the connection by using scatter gather. + */ + object ScatterGather extends RouterType + /** * A RouterType that selects the connection based on the least amount of cpu usage */ @@ -67,9 +70,13 @@ object RouterType { * A user-defined custom RouterType. */ case class Custom(implClass: String) extends RouterType - } +/** + * An {@link AkkaException} thrown when something goes wrong while routing a message + */ +class RoutingException(message: String) extends AkkaException(message) + /** * Contains the configuration to create local and clustered routed actor references. * Routed ActorRef configuration object, this is thread safe and fully sharable. @@ -84,51 +91,9 @@ case class RoutedProps private[akka] ( } } -///** -// * The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the -// * {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}. -// * -// * @author Jonas Bonér -// */ -//trait Router { -// -// /** -// * Initializes this Router with a given set of Connections. The Router can use this datastructure to ask for -// * the current connections, signal that there were problems with one of the connections and see if there have -// * been changes in the connections. -// * -// * This method is not threadsafe, and should only be called once -// * -// * JMM Guarantees: -// * This method guarantees that all changes made in this method, are visible before one of the routing methods is called. -// */ -// def init(connectionManager: ConnectionManager) -// -// /** -// * Routes the message to one of the connections. -// * -// * @throws RoutingException if something goes wrong while routing the message -// */ -// def route(message: Any)(implicit sender: ActorRef) -// -// /** -// * Routes the message using a timeout to one of the connections and returns a Future to synchronize on the -// * completion of the processing of the message. -// * -// * @throws RoutingExceptionif something goes wrong while routing the message. -// */ -// def route[T](message: Any, timeout: Timeout): Future[T] -//} -// -///** -// * An {@link AkkaException} thrown when something goes wrong while routing a message -// */ -//class RoutingException(message: String) extends AkkaException(message) -// - /** - * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to - * on (or more) of these actors. + * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to + * send a message to on (or more) of these actors. */ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, _path: ActorPath) extends LocalActorRef( @@ -140,40 +105,37 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup val route: Routing.Route = _props.routerConfig.createRoute(_props.creator, actorContext) override def !(message: Any)(implicit sender: ActorRef = null) { - route(message) match { - case null ⇒ super.!(message)(sender) - case ref: ActorRef ⇒ ref.!(message)(sender) - case refs: Traversable[ActorRef] ⇒ refs foreach (_.!(message)(sender)) + val s = if (sender eq null) underlying.system.deadLetters else sender + + val msg = message match { + case Broadcast(m) ⇒ m + case m ⇒ m + } + + route(s, message) match { + case Nil ⇒ super.!(message)(s) + case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender)) } } } trait RouterConfig extends Function0[Actor] { def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig + def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Routing.Route } /** - * Routing configuration that indicates no routing. - * Oxymoron style. - */ -case object NoRouting extends RouterConfig { - - def adaptFromDeploy(deploy: Option[Deploy]) = null - - def createRoute(creator: () ⇒ Actor, actorContext: ActorContext) = null - - def apply(): Actor = null -} - -/** - * The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the - * {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}. + * A Router is responsible for sending a message to one (or more) of its connections. * * @author Jonas Bonér */ trait Router { - // TODO (HE): implement failure detection + def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[ActorRef]): Vector[ActorRef] = (nrOfInstances, targets) match { + case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) + case (_, xs) ⇒ Vector.empty[ActorRef] ++ xs + } } /** @@ -203,7 +165,20 @@ object Routing { } } - type Route = (Any) ⇒ AnyRef + case class Destination(sender: ActorRef, recipient: ActorRef) + type Route = (ActorRef, Any) ⇒ Iterable[Destination] +} + +/** + * Routing configuration that indicates no routing. + * Oxymoron style. + */ +case object NoRouter extends RouterConfig { + def adaptFromDeploy(deploy: Option[Deploy]) = null + + def createRoute(creator: () ⇒ Actor, actorContext: ActorContext) = null + + def apply(): Actor = null } /** @@ -220,10 +195,220 @@ object Routing { case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig { + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(t: java.util.Collection[ActorRef]) = { + this(targets = collectionAsScalaIterable(t)) + } + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { deploy match { - case Some(d) ⇒ copy(nrOfInstances = d.nrOfInstances.factor) - case _ ⇒ this + case Some(d) ⇒ + // In case there is a config then use this over any programmed settings. + copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) + case _ ⇒ this + } + } + + def apply(): Actor = new Actor { + def receive = { + case _ ⇒ + } + } + + def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + val routees: Vector[ActorRef] = + createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) + + val next = new AtomicInteger(0) + + def getNext(): ActorRef = { + routees(next.getAndIncrement % routees.size) + } + + { (sender, message) ⇒ + message match { + case msg: AutoReceivedMessage ⇒ Nil + case Broadcast(msg) ⇒ routees map (Destination(sender, _)) + case msg ⇒ List(Destination(sender, getNext())) + } + } + } +} + +/** + * A Router that randomly selects one of the target connections to send a message to. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) + extends Router with RouterConfig { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(t: java.util.Collection[ActorRef]) = { + this(targets = collectionAsScalaIterable(t)) + } + + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { + deploy match { + case Some(d) ⇒ + // In case there is a config then use this over any programmed settings. + copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) + case _ ⇒ this + } + } + + def apply(): Actor = new Actor { + def receive = { + case _ ⇒ + } + } + + import java.security.SecureRandom + + private val random = new ThreadLocal[SecureRandom] { + override def initialValue = SecureRandom.getInstance("SHA1PRNG") + } + + def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + val routees: Vector[ActorRef] = + createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) + + def getNext(): ActorRef = { + routees(random.get.nextInt(routees.size)) + } + + { (sender, message) ⇒ + message match { + case msg: AutoReceivedMessage ⇒ Nil + case Broadcast(msg) ⇒ routees map (Destination(sender, _)) + case msg ⇒ List(Destination(sender, getNext())) + } + } + } +} + +/** + * A Router that uses broadcasts a message to all its connections. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) + extends Router with RouterConfig { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(t: java.util.Collection[ActorRef]) = { + this(targets = collectionAsScalaIterable(t)) + } + + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { + deploy match { + case Some(d) ⇒ + // In case there is a config then use this over any programmed settings. + copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) + case _ ⇒ this + } + } + + def apply(): Actor = new Actor { + def receive = { + case _ ⇒ + } + } + + def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { + val routees: Vector[ActorRef] = + createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) + + { (sender, message) ⇒ + message match { + case msg: AutoReceivedMessage ⇒ Nil + case Broadcast(msg) ⇒ routees map (Destination(sender, _)) + case msg ⇒ routees map (Destination(sender, _)) + } + } + } +} + +/** + * Simple router that broadcasts the message to all routees, and replies with the first response. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(t: java.util.Collection[ActorRef]) = { + this(targets = collectionAsScalaIterable(t)) + } + + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { + deploy match { + case Some(d) ⇒ + // In case there is a config then use this over any programmed settings. + copy(nrOfInstances = d.nrOfInstances.factor, targets = Nil) + case _ ⇒ this } } @@ -236,307 +421,18 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { val routees: Vector[ActorRef] = (nrOfInstances, targets) match { case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") - case (x, Nil) ⇒ - println("----> 0, Nil") - (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouting)))(scala.collection.breakOut) - case (x, xs) ⇒ - println("----> x, xs") - Vector.empty[ActorRef] ++ xs + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) + case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs } - val next = new AtomicInteger(0) - - def getNext(): ActorRef = { - routees(next.getAndIncrement % routees.size) - } - - { - case _: AutoReceivedMessage ⇒ null //TODO: handle system specific messages - case Broadcast(msg) ⇒ routees - case msg ⇒ getNext() + { (sender, message) ⇒ + val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME! + asker.result.pipeTo(sender) + message match { + case msg: AutoReceivedMessage ⇒ Nil + case Broadcast(msg) ⇒ routees map (Destination(asker, _)) + case msg ⇒ routees map (Destination(asker, _)) + } } } - - /* - private val state = new AtomicReference[RoundRobinState] - - def next: Option[ActorRef] = currentState.next - - @tailrec - private def currentState: RoundRobinState = { - val current = state.get - - if (current != null && current.version == connectionManager.version) { - //we are lucky, since there has not been any change in the connections. So therefor we can use the existing state. - current - } else { - //there has been a change in connections, or it was the first try, so we need to update the internal state - - val connections = connectionManager.connections - val newState = new RoundRobinState(connections.iterable.toIndexedSeq[ActorRef], connections.version) - if (state.compareAndSet(current, newState)) - //we are lucky since we just updated the state, so we can send it back as the state to use - newState - else //we failed to update the state, lets try again... better luck next time. - currentState - } - } - - private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) { - - private val index = new AtomicInteger(0) - - def next: Option[ActorRef] = if (array.isEmpty) None else Some(array(nextIndex)) - - @tailrec - private def nextIndex: Int = { - val oldIndex = index.get - var newIndex = if (oldIndex == array.length - 1) 0 else oldIndex + 1 - - if (!index.compareAndSet(oldIndex, newIndex)) nextIndex - else oldIndex - } - } - */ -} - -///** -// * An Abstract Router implementation that already provides the basic infrastructure so that a concrete -// * Router only needs to implement the next method. -// */ -//trait BasicRouter extends Router { -// -// @volatile -// protected var connectionManager: ConnectionManager = _ -// -// def init(connectionManager: ConnectionManager) = { -// this.connectionManager = connectionManager -// } -// -// def route(message: Any)(implicit sender: ActorRef) = message match { -// case Routing.Broadcast(message) ⇒ -// -// //it is a broadcast message, we are going to send to message to all connections. -// connectionManager.connections.iterable foreach { -// connection ⇒ -// try { -// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' -// } catch { -// case e: Exception ⇒ -// connectionManager.remove(connection) -// throw e -// } -// } -// case _ ⇒ -// //it no broadcast message, we are going to select an actor from the connections and send the message to him. -// next match { -// case Some(connection) ⇒ -// try { -// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' -// } catch { -// case e: Exception ⇒ -// connectionManager.remove(connection) -// throw e -// } -// case None ⇒ -// throwNoConnectionsError -// } -// } -// -// def route[T](message: Any, timeout: Timeout): Future[T] = message match { -// case Routing.Broadcast(message) ⇒ -// throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.") -// case _ ⇒ -// //it no broadcast message, we are going to select an actor from the connections and send the message to him. -// next match { -// case Some(connection) ⇒ -// try { -// connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it?? -// } catch { -// case e: Exception ⇒ -// connectionManager.remove(connection) -// throw e -// } -// case None ⇒ -// throwNoConnectionsError -// } -// } -// -// protected def next: Option[ActorRef] -// -// private def throwNoConnectionsError = throw new RoutingException("No replica connections for router") -//} -// -///** -// * A Router that uses broadcasts a message to all its connections. -// */ -//class BroadcastRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter with Serializable { -// override def route(message: Any)(implicit sender: ActorRef) = { -// connectionManager.connections.iterable foreach { -// connection ⇒ -// try { -// connection.!(message)(sender) // we use original sender, so this is essentially a 'forward' -// } catch { -// case e: Exception ⇒ -// connectionManager.remove(connection) -// throw e -// } -// } -// } -// -// //protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = -// override def route[T](message: Any, timeout: Timeout): Future[T] = { -// import Future._ -// implicit val t = timeout -// val futures = connectionManager.connections.iterable map { -// connection ⇒ -// connection.?(message, timeout).asInstanceOf[Future[T]] -// } -// Future.firstCompletedOf(futures) -// } -// -// protected def next: Option[ActorRef] = None -//} -// -///** -// * A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef. -// * -// * @author Jonas Bonér -// */ -//class DirectRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter { -// -// private val state = new AtomicReference[DirectRouterState] -// -// lazy val next: Option[ActorRef] = { -// val current = currentState -// if (current.ref == null) None else Some(current.ref) -// } -// -// @tailrec -// private def currentState: DirectRouterState = { -// val current = state.get -// -// if (current != null && connectionManager.version == current.version) { -// //we are lucky since nothing has changed in the connections. -// current -// } else { -// //there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating. -// -// val connections = connectionManager.connections -// -// val connectionCount = connections.iterable.size -// if (connectionCount > 1) -// throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount)) -// -// val newState = new DirectRouterState(connections.iterable.head, connections.version) -// if (state.compareAndSet(current, newState)) -// //we are lucky since we just updated the state, so we can send it back as the state to use -// newState -// else //we failed to update the state, lets try again... better luck next time. -// currentState // recur -// } -// } -// -// private case class DirectRouterState(ref: ActorRef, version: Long) -// -//} -// -///** -// * A Router that randomly selects one of the target connections to send a message to. -// * -// * @author Jonas Bonér -// */ -//class RandomRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends BasicRouter { -// -// import java.security.SecureRandom -// -// private val state = new AtomicReference[RandomRouterState] -// -// private val random = new ThreadLocal[SecureRandom] { -// override def initialValue = SecureRandom.getInstance("SHA1PRNG") -// } -// -// def next: Option[ActorRef] = currentState.array match { -// case a if a.isEmpty ⇒ None -// case a ⇒ Some(a(random.get.nextInt(a.length))) -// } -// -// @tailrec -// private def currentState: RandomRouterState = { -// val current = state.get -// -// if (current != null && current.version == connectionManager.version) { -// //we are lucky, since there has not been any change in the connections. So therefor we can use the existing state. -// current -// } else { -// //there has been a change in connections, or it was the first try, so we need to update the internal state -// -// val connections = connectionManager.connections -// val newState = new RandomRouterState(connections.iterable.toIndexedSeq, connections.version) -// if (state.compareAndSet(current, newState)) -// //we are lucky since we just updated the state, so we can send it back as the state to use -// newState -// else //we failed to update the state, lets try again... better luck next time. -// currentState -// } -// } -// -// private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long) -//} - -/** - * ScatterGatherRouter broadcasts the message to all connections and gathers results according to the - * specified strategy (specific router needs to implement `gather` method). - * Scatter-gather pattern will be applied only to the messages broadcasted using Future - * (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget - * mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type - * - * FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected. - * FIXME: this is also the location where message buffering should be done in case of failure. - */ -/* -trait ScatterGatherRouter extends BasicRouter with Serializable { - - /** - * Aggregates the responses into a single Future. - * - * @param results Futures of the responses from connections - */ - protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] - - private def scatterGather[S, G >: S](message: Any, timeout: Timeout): Future[G] = { - val responses = connectionManager.connections.iterable.flatMap { - actor ⇒ - try { - if (actor.isTerminated) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace - Some(actor.?(message, timeout).asInstanceOf[Future[S]]) - } catch { - case e: Exception ⇒ - connectionManager.remove(actor) - None - } - } - - if (responses.isEmpty) - throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message)) - else gather(responses) - } - - override def route[T](message: Any, timeout: Timeout): Future[T] = message match { - case Routing.Broadcast(message) ⇒ scatterGather(message, timeout) - case message ⇒ super.route(message, timeout) - } -} -*/ - -/** - * Simple router that broadcasts the message to all connections, and replies with the first response - * Scatter-gather pattern will be applied only to the messages broadcasted using Future - * (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget - * mode, the router would behave as {@link RoundRobinRouter} - */ -/* -class ScatterGatherFirstCompletedRouter(implicit dispatcher: MessageDispatcher, timeout: Timeout) extends RoundRobinRouter with ScatterGatherRouter { - protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results) -} -*/ +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index b1093bb943..0b8044f9c4 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -7,7 +7,6 @@ package akka.remote import akka.actor._ import akka.actor.Status._ import akka.event.Logging -import akka.util.duration._ import akka.util.Duration import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index b8e00702a8..b67b7805be 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -68,10 +68,15 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti private var _server: RemoteSupport[ParsedTransportAddress] = _ def server = _server - def init(system: ActorSystemImpl) = { + @volatile + private var _provider: RemoteActorRefProvider = _ + def provider = _provider + + def init(system: ActorSystemImpl, provider: RemoteActorRefProvider) = { val log = Logging(system, "Remote") + _provider = provider _serialization = SerializationExtension(system) _computeGridDispatcher = system.dispatcherFactory.fromConfig("akka.remote.compute-grid-dispatcher") _remoteDaemon = new RemoteSystemDaemon(system, this, system.provider.rootPath / "remote", system.provider.rootGuardian, log) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 2d974e25fd..391198efb3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -4,24 +4,14 @@ package akka.remote -import akka.AkkaException import akka.actor._ -import akka.actor.Actor._ -import akka.actor.Status._ -import akka.routing._ import akka.dispatch._ -import akka.util.duration._ -import akka.config.ConfigurationException -import akka.event.{ DeathWatch, Logging } +import akka.event.Logging import akka.serialization.Compression.LZF import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import com.google.protobuf.ByteString -import java.util.concurrent.atomic.AtomicBoolean import akka.event.EventStream -import java.util.concurrent.ConcurrentHashMap -import akka.dispatch.Promise -import java.net.InetAddress import akka.serialization.SerializationExtension import akka.serialization.Serialization import akka.config.ConfigurationException @@ -62,7 +52,7 @@ class RemoteActorRefProvider( def init(system: ActorSystemImpl) { local.init(system) - remote.init(system) + remote.init(system, this) local.registerExtraNames(Map(("remote", remote.remoteDaemon))) terminationFuture.onComplete(_ ⇒ remote.server.shutdown()) } @@ -145,7 +135,7 @@ class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) - def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) + def ask(within: Timeout): Option[AskActorRef] = local.ask(within) /** * Using (checking out) actor on a specific node. @@ -178,7 +168,7 @@ class RemoteActorRefProvider( * @author Jonas Bonér */ private[akka] class RemoteActorRef private[akka] ( - provider: ActorRefProvider, + provider: RemoteActorRefProvider, remote: RemoteSupport[ParsedTransportAddress], val path: ActorPath, val getParent: InternalActorRef, @@ -203,7 +193,16 @@ private[akka] class RemoteActorRef private[akka] ( override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader) - override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout) + override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { + provider.ask(timeout) match { + case Some(a) ⇒ + this.!(message)(a) + a.result + case None ⇒ + this.!(message)(null) + new DefaultPromise[Any](0)(provider.dispatcher) + } + } def suspend(): Unit = sendSystemMessage(Suspend()) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 5963bb23a2..3dc0bf26fc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -149,5 +149,5 @@ class RemoteConnectionManager( } private[remote] def newConnection(remoteAddress: ParsedTransportAddress, actorPath: ActorPath) = - new RemoteActorRef(system.provider, remote.server, actorPath, Nobody, None) + new RemoteActorRef(remote.provider, remote.server, actorPath, Nobody, None) } diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index e482597846..42b9d062a5 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -6,13 +6,12 @@ package akka.tutorial.first.java; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.InternalActorRef; +import akka.actor.Props; import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; -import akka.japi.Creator; -import akka.routing.*; +import akka.routing.RoundRobinRouter; -import java.util.LinkedList; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; public class Pi { @@ -105,23 +104,8 @@ public class Pi { this.nrOfMessages = nrOfMessages; this.nrOfElements = nrOfElements; this.latch = latch; - Creator routerCreator = new Creator() { - public Router create() { - // TODO (HE) : implement - //return new RoundRobinRouter(getContext().dispatcher(), new akka.actor.Timeout(-1)); - return null; - } - }; - LinkedList actors = new LinkedList() { - { - for (int i = 0; i < nrOfWorkers; i++) add(getContext().actorOf(Worker.class)); - } - }; - // FIXME routers are intended to be used like this - // TODO (HE): implement - //RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true); - //router = new RoutedActorRef(getContext().system(), props, (InternalActorRef) getSelf(), "pi"); + router = this.getContext().actorOf(new Props().withCreator(Worker.class).withRouting(new RoundRobinRouter(5)), "pi"); } // message handler @@ -170,7 +154,7 @@ public class Pi { final CountDownLatch latch = new CountDownLatch(1); // create the master - ActorRef master = system.actorOf(new UntypedActorFactory() { + ActorRef master = system.actorOf(new akka.actor.UntypedActorFactory() { public UntypedActor create() { return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); } diff --git a/akka-tutorials/akka-tutorial-first/src/main/resources/akka.conf b/akka-tutorials/akka-tutorial-first/src/main/resources/akka.conf index 13b0c06eb1..17517bccbe 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/resources/akka.conf +++ b/akka-tutorials/akka-tutorial-first/src/main/resources/akka.conf @@ -1,6 +1,6 @@ akka.actor.deployment { - /user/pi2 { + /user/master/pi { router = round-robin - nr-of-instances = 4 + nr-of-instances = 10 } -} \ No newline at end of file +} diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 233c6ae26d..6634eef783 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -6,12 +6,12 @@ package akka.tutorial.first.scala import java.util.concurrent.CountDownLatch import akka.actor._ import akka.routing._ -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{ ConfigFactory, Config } object Pi extends App { // Initiate the calculation - calculate(nrOfWorkers = 4, nrOfElements = 10, nrOfMessages = 10) + calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) // ==================== // ===== Messages ===== @@ -39,7 +39,6 @@ object Pi extends App { def receive = { case Work(start, nrOfElements) ⇒ - println("*** RECEIVED MESSAGE IN: " + self.path) sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work } } @@ -54,27 +53,8 @@ object Pi extends App { var nrOfResults: Int = _ var start: Long = _ - // create the workers - - var workers = Vector.empty[ActorRef] - for (i ← 1 to 2) { - workers = context.actorOf[Worker] +: workers - } - - // TODO (HE) : use this way of creating actors - //val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker]) - - /* - // wrap them with a load-balancing router - // FIXME routers are intended to be used like this - implicit val timout = context.system.settings.ActorTimeout - implicit val dispatcher = context.dispatcher - val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers)) - val router = new RoutedActorRef(context.system, props, self.asInstanceOf[InternalActorRef], "pi") - */ - - //val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi") - val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi") + // create a round robin router for the workers + val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi") // message handler def receive = { @@ -107,7 +87,7 @@ object Pi extends App { // ===== Run it ===== // ================== def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - val system = ActorSystem("x", ConfigFactory.parseString("akka.actor.debug.lifecycle=true\nakka.loglevel=DEBUG")) + val system = ActorSystem() // this latch is only plumbing to know when the calculation is completed val latch = new CountDownLatch(1)