diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index d4d745ae8f..751092827e 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -131,32 +131,6 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter1.get must be(1) counter2.get must be(1) } - - // TODO (HE) : Is this still a valid test case? - /* - "fail to deliver a broadcast message using the ?" in { - val doneLatch = new CountDownLatch(1) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case _ ⇒ counter1.incrementAndGet() - } - }) - - val routedActor = system.actorOf(Props(new TestActor).withRouting(RoundRobinRouter(targets = List(connection1)))) - - intercept[RoutingException] { - routedActor ? Broadcast(1) - } - - routedActor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) - counter1.get must be(0) - } - */ - } "random router" must { @@ -195,35 +169,6 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter1.get must be(1) counter2.get must be(1) } - - // TODO (HE) : Is this still a valid test case? - /* - "fail to deliver a broadcast message using the ?" in { - val doneLatch = new CountDownLatch(1) - - val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case _ ⇒ counter1.incrementAndGet() - } - }) - - val props = RoutedProps(routerFactory = () ⇒ new RandomRouter, connectionManager = new LocalConnectionManager(List(connection1))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - try { - actor ? Broadcast(1) - fail() - } catch { - case e: RoutingException ⇒ - } - - actor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) - counter1.get must be(0) - } - */ } "broadcast router" must { @@ -293,129 +238,35 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { } } - // TODO (HE) : add tests below - /* - -"Scatter-gather router" must { - - "return response, even if one of the actors has stopped" in { - val shutdownLatch = new TestLatch(1) - val actor1 = newActor(1, Some(shutdownLatch)) - val actor2 = newActor(2, Some(shutdownLatch)) - val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2)))) - - routedActor ! Broadcast(Stop(Some(1))) - shutdownLatch.await - (routedActor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) - } - - "throw an exception, if all the connections have stopped" in { - - val shutdownLatch = new TestLatch(2) - - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0, Some(shutdownLatch)), newActor(1, Some(shutdownLatch))))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor ! Broadcast(Stop()) - - shutdownLatch.await - - (intercept[RoutingException] { - actor ? Broadcast(0) - }) must not be (null) - - } - - "return the first response from connections, when all of them replied" in { - - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1)))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - (actor ? Broadcast("Hi!")).get.asInstanceOf[Int] must be(0) - - } - - "return the first response from connections, when some of them failed to reply" in { - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0), newActor(1)))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - (actor ? Broadcast(0)).get.asInstanceOf[Int] must be(1) - } + "Scatter-gather router" must { "be started when constructed" in { - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(newActor(0)))) - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor.isTerminated must be(false) - } - - "deliver one-way messages in a round robin fashion" in { - val connectionCount = 10 - val iterationCount = 10 - val doneLatch = new TestLatch(connectionCount) - - var connections = new LinkedList[ActorRef] - var counters = new LinkedList[AtomicInteger] - for (i ← 0 until connectionCount) { - counters = counters :+ new AtomicInteger() - - val connection = system.actorOf(new Actor { - def receive = { - case "end" ⇒ doneLatch.countDown() - case msg: Int ⇒ counters.get(i).get.addAndGet(msg) - } - }) - connections = connections :+ connection - } - - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(connections)) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - for (i ← 0 until iterationCount) { - for (k ← 0 until connectionCount) { - actor ! (k + 1) - } - } - - actor ! Broadcast("end") - - doneLatch.await - - for (i ← 0 until connectionCount) { - val counter = counters.get(i).get - counter.get must be((iterationCount * (i + 1))) - } + val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(newActor(0))))) + routedActor.isTerminated must be(false) } "deliver a broadcast message using the !" in { val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger - val connection1 = system.actorOf(new Actor { + val actor1 = system.actorOf(new Actor { def receive = { - case "end" ⇒ doneLatch.countDown() + case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter1.addAndGet(msg) } }) val counter2 = new AtomicInteger - val connection2 = system.actorOf(new Actor { + val actor2 = system.actorOf(new Actor { def receive = { - case "end" ⇒ doneLatch.countDown() + case "end" ⇒ doneLatch.countDown() case msg: Int ⇒ counter2.addAndGet(msg) } }) - val props = RoutedProps(routerFactory = () ⇒ new ScatterGatherFirstCompletedRouter, connectionManager = new LocalConnectionManager(List(connection1, connection2))) - - val actor = new RoutedActorRef(system, props, impl.guardian, "foo") - - actor ! Broadcast(1) - actor ! Broadcast("end") + val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2)))) + routedActor ! Broadcast(1) + routedActor ! Broadcast("end") doneLatch.await @@ -423,31 +274,33 @@ class RoutingSpec extends AkkaSpec with DefaultTimeout { counter2.get must be(1) } + "return response, even if one of the actors has stopped" in { + val shutdownLatch = new TestLatch(1) + val actor1 = newActor(1, Some(shutdownLatch)) + val actor2 = newActor(22, Some(shutdownLatch)) + val routedActor = system.actorOf(Props(new TestActor).withRouting(ScatterGatherFirstCompletedRouter(targets = List(actor1, actor2)))) - case class Stop(id: Option[Int] = None) + routedActor ! Broadcast(Stop(Some(1))) + shutdownLatch.await + (routedActor ? Broadcast(0)).as[Int].get must be(22) + } - def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(new Actor { - def receive = { - case Stop(None) ⇒ - println(">>>> STOPPING : " + id) - self.stop() - case Stop(Some(_id)) if (_id == id) ⇒ - println(">>>> STOPPING >: " + id) - self.stop() - case _id: Int if (_id == id) ⇒ - println("-----> ID MATCH - do nothing") - case x ⇒ { - Thread sleep 100 * id - println("-----> SENDING REPLY: " + id) - sender.tell(id) + case class Stop(id: Option[Int] = None) + + def newActor(id: Int, shudownLatch: Option[TestLatch] = None) = system.actorOf(Props(new Actor { + def receive = { + case Stop(None) ⇒ self.stop() + case Stop(Some(_id)) if (_id == id) ⇒ self.stop() + case _id: Int if (_id == id) ⇒ + case x ⇒ { + Thread sleep 100 * id + sender.tell(id) + } } - } - override def postStop = { - println("***** POSTSTOP") - shudownLatch foreach (_.countDown()) - } - }) -} -*/ + override def postStop = { + shudownLatch foreach (_.countDown()) + } + }), "Actor:" + id) + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 0c72fc8901..beedb82a03 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -299,7 +299,16 @@ private[akka] class LocalActorRef private[akka] ( def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender) - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = actorCell.provider.ask(message, this, timeout) + def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { + actorCell.provider.ask(timeout) match { + case Some(a) ⇒ + this.!(message)(a) + a.result + case None ⇒ + this.!(message)(null) + new DefaultPromise[Any](0)(system.dispatcher) + } + } def restart(cause: Throwable): Unit = actorCell.restart(cause) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 9f0a31735d..39dd1e4ba9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -5,21 +5,13 @@ package akka.actor import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{ ConcurrentHashMap, TimeUnit } -import scala.annotation.tailrec import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer } -import akka.actor.Timeout.intToTimeout -import akka.config.ConfigurationException import akka.dispatch._ import akka.routing._ import akka.AkkaException import akka.util.{ Duration, Switch, Helpers } -import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import akka.event._ -import akka.event.Logging.Error._ -import akka.event.Logging.Warning import java.io.Closeable -import com.typesafe.config.Config /** * Interface for all ActorRef providers to implement. @@ -105,9 +97,10 @@ trait ActorRefProvider { def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef /** - * Create AskActorRef to hook up message send to recipient with Future receiver. + * Create AskActorRef and register it properly so it can be serialized/deserialized; + * caller needs to send the message. */ - def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] + def ask(within: Timeout): Option[AskActorRef] /** * This Future is completed upon termination of this ActorRefProvider, which @@ -530,11 +523,10 @@ class LocalActorRefProvider( r.adaptFromDeploy(deployer.lookup(lookupPath)) } - def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { - import akka.dispatch.DefaultPromise + def ask(within: Timeout): Option[AskActorRef] = { (if (within == null) settings.ActorTimeout else within) match { case t if t.duration.length <= 0 ⇒ - new DefaultPromise[Any](0)(dispatcher) //Abort early if nonsensical timeout + None case t ⇒ val path = tempPath() val name = path.name @@ -544,8 +536,7 @@ class LocalActorRefProvider( } } tempContainer.addChild(name, a) - recipient.tell(message, a) - a.result + Some(a) } } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index f30f3b3257..034993bd22 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -9,13 +9,13 @@ import akka.actor._ import akka.japi.Creator import java.lang.reflect.InvocationTargetException import akka.config.ConfigurationException -import akka.routing.Routing.Broadcast import akka.actor.DeploymentConfig.Deploy import java.util.concurrent.atomic.AtomicInteger -import akka.dispatch.Future -import akka.util.{ Duration, ReflectiveAccess } -import java.util.concurrent.TimeUnit +import akka.util.ReflectiveAccess import akka.AkkaException +import scala.collection.JavaConversions._ +import akka.routing.Routing.{ Destination, Broadcast } +import java.util.concurrent.TimeUnit sealed trait RouterType @@ -41,16 +41,16 @@ object RouterType { */ object RoundRobin extends RouterType - /** - * A RouterType that selects the connection by using scatter gather. - */ - object ScatterGather extends RouterType - /** * A RouterType that broadcasts the messages to all connections. */ object Broadcast extends RouterType + /** + * A RouterType that selects the connection by using scatter gather. + */ + object ScatterGather extends RouterType + /** * A RouterType that selects the connection based on the least amount of cpu usage */ @@ -70,7 +70,6 @@ object RouterType { * A user-defined custom RouterType. */ case class Custom(implClass: String) extends RouterType - } /** @@ -106,20 +105,18 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup val route: Routing.Route = _props.routerConfig.createRoute(_props.creator, actorContext) override def !(message: Any)(implicit sender: ActorRef = null) { - route(message) match { - case null ⇒ super.!(message)(sender) - case ref: ActorRef ⇒ ref.!(message)(sender) - case refs: Traversable[ActorRef] ⇒ - message match { - case Broadcast(m) ⇒ refs foreach (_.!(m)(sender)) - case _ ⇒ refs foreach (_.!(message)(sender)) - } + val s = if (sender eq null) underlying.system.deadLetters else sender + + val msg = message match { + case Broadcast(m) ⇒ m + case m ⇒ m + } + + route(s, message) match { + case Nil ⇒ super.!(message)(s) + case refs ⇒ refs foreach (p ⇒ p.recipient.!(msg)(p.sender)) } } - - // TODO (HE) : Should the RoutedActorRef also override "?"? - // If not how then Broadcast messages cannot be sent via ? - - // which it is in some test cases at the moment. } trait RouterConfig extends Function0[Actor] { @@ -134,6 +131,11 @@ trait RouterConfig extends Function0[Actor] { * @author Jonas Bonér */ trait Router { + def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[ActorRef]): Vector[ActorRef] = (nrOfInstances, targets) match { + case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") + case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) + case (_, xs) ⇒ Vector.empty[ActorRef] ++ xs + } } /** @@ -163,7 +165,8 @@ object Routing { } } - type Route = (Any) ⇒ AnyRef + case class Destination(sender: ActorRef, recipient: ActorRef) + type Route = (ActorRef, Any) ⇒ Iterable[Destination] } /** @@ -192,6 +195,22 @@ case object NoRouter extends RouterConfig { case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig { + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(t: java.util.Collection[ActorRef]) = { + this(targets = collectionAsScalaIterable(t)) + } + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { deploy match { case Some(d) ⇒ @@ -208,11 +227,8 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] } def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { - val routees: Vector[ActorRef] = (nrOfInstances, targets) match { - case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") - case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) - case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs - } + val routees: Vector[ActorRef] = + createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) val next = new AtomicInteger(0) @@ -220,10 +236,12 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] routees(next.getAndIncrement % routees.size) } - { - case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled? - case Broadcast(msg) ⇒ routees - case msg ⇒ getNext() + { (sender, message) ⇒ + message match { + case msg: AutoReceivedMessage ⇒ Nil + case Broadcast(msg) ⇒ routees map (Destination(sender, _)) + case msg ⇒ List(Destination(sender, getNext())) + } } } } @@ -242,6 +260,22 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig { + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(t: java.util.Collection[ActorRef]) = { + this(targets = collectionAsScalaIterable(t)) + } + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { deploy match { case Some(d) ⇒ @@ -264,20 +298,19 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni } def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { - val routees: Vector[ActorRef] = (nrOfInstances, targets) match { - case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") - case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) - case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs - } + val routees: Vector[ActorRef] = + createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) def getNext(): ActorRef = { routees(random.get.nextInt(routees.size)) } - { - case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled? - case Broadcast(msg) ⇒ routees - case msg ⇒ getNext() + { (sender, message) ⇒ + message match { + case msg: AutoReceivedMessage ⇒ Nil + case Broadcast(msg) ⇒ routees map (Destination(sender, _)) + case msg ⇒ List(Destination(sender, getNext())) + } } } } @@ -296,6 +329,22 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Ni case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig { + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(t: java.util.Collection[ActorRef]) = { + this(targets = collectionAsScalaIterable(t)) + } + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { deploy match { case Some(d) ⇒ @@ -312,29 +361,48 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = } def createRoute(creator: () ⇒ Actor, context: ActorContext): Routing.Route = { - val routees: Vector[ActorRef] = (nrOfInstances, targets) match { - case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") - case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(context.props.copy(creator = creator, routerConfig = NoRouter)))(scala.collection.breakOut) - case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs - } + val routees: Vector[ActorRef] = + createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) - { - case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled? - case Broadcast(msg) ⇒ routees - case msg ⇒ routees + { (sender, message) ⇒ + message match { + case msg: AutoReceivedMessage ⇒ Nil + case Broadcast(msg) ⇒ routees map (Destination(sender, _)) + case msg ⇒ routees map (Destination(sender, _)) + } } } } -// TODO (HE) : Correct description below /** - * Simple router that broadcasts the message to all connections, and replies with the first response. - * Scatter-gather pattern will be applied only to the messages broadcast using Future - * (wrapped into {@link Routing.Broadcast} and sent with "?" method). - * For the messages sent in a fire-forget mode, the router would behave as {@link RoundRobinRouter} + * Simple router that broadcasts the message to all routees, and replies with the first response. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[ActorRef] = Nil) extends Router with RouterConfig { + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = { + this(nrOfInstances = nr) + } + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(t: java.util.Collection[ActorRef]) = { + this(targets = collectionAsScalaIterable(t)) + } + def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { deploy match { case Some(d) ⇒ @@ -357,28 +425,14 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: It case (x, xs) ⇒ Vector.empty[ActorRef] ++ xs } - def scatterGather[S, G >: S](message: Any, t: Timeout): Future[G] = { - val responses = routees.flatMap { actor ⇒ - try { - if (actor.isTerminated) None else Some(actor.?(message, t).asInstanceOf[Future[S]]) - } catch { - case e: Exception ⇒ None - } + { (sender, message) ⇒ + val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME! + asker.result.pipeTo(sender) + message match { + case msg: AutoReceivedMessage ⇒ Nil + case Broadcast(msg) ⇒ routees map (Destination(asker, _)) + case msg ⇒ routees map (Destination(asker, _)) } - - if (!responses.isEmpty) throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message)) - else { - implicit val messageDispatcher = context.dispatcher - implicit val timeout = t - Future.firstCompletedOf(responses) - } - } - - // TODO (HE) : Timeout and Future should be updated to new strategy - or hardcoded value below should at least be removed! - { - case msg: AutoReceivedMessage ⇒ null // TODO (HE): how should system messages be handled? - case Broadcast(msg) ⇒ routees - case msg ⇒ scatterGather(msg, Timeout(Duration(5000, TimeUnit.MILLISECONDS))) } } -} +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 7b6fd8a660..da46471c5e 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -7,7 +7,6 @@ package akka.remote import akka.actor._ import akka.actor.Status._ import akka.event.Logging -import akka.util.duration._ import akka.util.Duration import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ @@ -161,7 +160,7 @@ class Gossiper(remote: Remote) { node ← oldAvailableNodes if connectionManager.connectionFor(node).isEmpty } { - val connectionFactory = () ⇒ new RemoteActorRef(remote.system.provider, remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, Nobody, None) + val connectionFactory = () ⇒ new RemoteActorRef(remote.system.provider.asInstanceOf[RemoteActorRefProvider], remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, Nobody, None) connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 0b8ad16654..caefc85838 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -110,7 +110,7 @@ class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) - def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) + def ask(within: Timeout): Option[AskActorRef] = local.ask(within) /** * Using (checking out) actor on a specific node. @@ -143,7 +143,7 @@ class RemoteActorRefProvider( * @author Jonas Bonér */ private[akka] class RemoteActorRef private[akka] ( - provider: ActorRefProvider, + provider: RemoteActorRefProvider, remote: RemoteSupport, val path: ActorPath, val getParent: InternalActorRef, @@ -168,7 +168,16 @@ private[akka] class RemoteActorRef private[akka] ( override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader) - override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout) + override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { + provider.ask(timeout) match { + case Some(a) ⇒ + this.!(message)(a) + a.result + case None ⇒ + this.!(message)(null) + new DefaultPromise[Any](0)(provider.dispatcher) + } + } def suspend(): Unit = sendSystemMessage(Suspend()) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 49ae8d995c..136ce57da4 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -149,5 +149,5 @@ class RemoteConnectionManager( } private[remote] def newConnection(remoteAddress: RemoteAddress, actorPath: ActorPath) = - new RemoteActorRef(remote.system.provider, remote.server, actorPath, Nobody, None) + new RemoteActorRef(remote.system.provider.asInstanceOf[RemoteActorRefProvider], remote.server, actorPath, Nobody, None) } diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index e482597846..42b9d062a5 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -6,13 +6,12 @@ package akka.tutorial.first.java; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.InternalActorRef; +import akka.actor.Props; import akka.actor.UntypedActor; -import akka.actor.UntypedActorFactory; -import akka.japi.Creator; -import akka.routing.*; +import akka.routing.RoundRobinRouter; -import java.util.LinkedList; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; public class Pi { @@ -105,23 +104,8 @@ public class Pi { this.nrOfMessages = nrOfMessages; this.nrOfElements = nrOfElements; this.latch = latch; - Creator routerCreator = new Creator() { - public Router create() { - // TODO (HE) : implement - //return new RoundRobinRouter(getContext().dispatcher(), new akka.actor.Timeout(-1)); - return null; - } - }; - LinkedList actors = new LinkedList() { - { - for (int i = 0; i < nrOfWorkers; i++) add(getContext().actorOf(Worker.class)); - } - }; - // FIXME routers are intended to be used like this - // TODO (HE): implement - //RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true); - //router = new RoutedActorRef(getContext().system(), props, (InternalActorRef) getSelf(), "pi"); + router = this.getContext().actorOf(new Props().withCreator(Worker.class).withRouting(new RoundRobinRouter(5)), "pi"); } // message handler @@ -170,7 +154,7 @@ public class Pi { final CountDownLatch latch = new CountDownLatch(1); // create the master - ActorRef master = system.actorOf(new UntypedActorFactory() { + ActorRef master = system.actorOf(new akka.actor.UntypedActorFactory() { public UntypedActor create() { return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch); } diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index f0bdc36547..6634eef783 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -53,12 +53,8 @@ object Pi extends App { var nrOfResults: Int = _ var start: Long = _ - // create the workers - val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker]) - // create a round robin router for the workers val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi") - //val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi") // message handler def receive = { @@ -91,7 +87,7 @@ object Pi extends App { // ===== Run it ===== // ================== def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { - val system = ActorSystem("PiSystem", ConfigFactory.load("akka.conf")) + val system = ActorSystem() // this latch is only plumbing to know when the calculation is completed val latch = new CountDownLatch(1)