diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 6b318c1433..6c0e699800 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -8,6 +8,7 @@ import akka.testkit.AkkaSpec import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import akka.routing._ +import akka.util.duration._ object DeployerSpec { val deployerConf = ConfigFactory.parseString(""" @@ -35,6 +36,7 @@ object DeployerSpec { } /user/service-scatter-gather { router = scatter-gather + within = 2 seconds } } """, ConfigParseOptions.defaults) @@ -116,7 +118,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { } "be able to parse 'akka.actor.deployment._' with scatter-gather router" in { - assertRouting(ScatterGatherFirstCompletedRouter(1), "/user/service-scatter-gather") + assertRouting(ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/user/service-scatter-gather") } def assertRouting(expected: RouterConfig, service: String) { diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 617bfa5b5f..0de41a24d7 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -6,11 +6,10 @@ package akka.routing import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import collection.mutable.LinkedList -import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await -import com.typesafe.config.ConfigFactory +import akka.util.Duration object RoutingSpec { @@ -30,18 +29,7 @@ object RoutingSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" - akka { - actor { - deployment { - /a1 { - router = round-robin - nr-of-instances = 3 - } - } - } - } - """)) with DefaultTimeout with ImplicitSender { +class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val impl = system.asInstanceOf[ActorSystemImpl] @@ -72,13 +60,16 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" } "be able to send their routees" in { - val doneLatch = new CountDownLatch(1) + val doneLatch = new TestLatch(1) class TheActor extends Actor { val routee1 = context.actorOf(Props[TestActor], "routee1") val routee2 = context.actorOf(Props[TestActor], "routee2") val routee3 = context.actorOf(Props[TestActor], "routee3") - val router = context.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(routee1, routee2, routee3)))) + val router = context.actorOf(Props[TestActor].withRouter( + ScatterGatherFirstCompletedRouter( + routees = List(routee1, routee2, routee3), + within = 5 seconds))) def receive = { case RouterRoutees(iterable) ⇒ @@ -93,7 +84,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" val theActor = system.actorOf(Props(new TheActor), "theActor") theActor ! "doIt" - doneLatch.await(1, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 1 seconds) } } @@ -314,7 +305,8 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" "Scatter-gather router" must { "be started when constructed" in { - val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(newActor(0))))) + val routedActor = system.actorOf(Props[TestActor].withRouter( + ScatterGatherFirstCompletedRouter(routees = List(newActor(0)), within = 1 seconds))) routedActor.isTerminated must be(false) } @@ -337,7 +329,8 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" } })) - val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2)))) + val routedActor = system.actorOf(Props[TestActor].withRouter( + ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 1 seconds))) routedActor ! Broadcast(1) routedActor ! Broadcast("end") @@ -350,12 +343,13 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" "return response, even if one of the actors has stopped" in { val shutdownLatch = new TestLatch(1) val actor1 = newActor(1, Some(shutdownLatch)) - val actor2 = newActor(22, Some(shutdownLatch)) - val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2)))) + val actor2 = newActor(14, Some(shutdownLatch)) + val routedActor = system.actorOf(Props[TestActor].withRouter( + ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 3 seconds))) routedActor ! Broadcast(Stop(Some(1))) Await.ready(shutdownLatch, TestLatch.DefaultTimeout) - Await.result(routedActor ? Broadcast(0), timeout.duration) must be(22) + Await.result(routedActor ? Broadcast(0), timeout.duration) must be(14) } case class Stop(id: Option[Int] = None) @@ -428,7 +422,10 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" //#crActors //#crRouter - case class VoteCountRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) + case class VoteCountRouter( + nrOfInstances: Int = 0, + routees: Iterable[String] = Nil, + within: Duration = Duration.Zero) extends RouterConfig { //#crRoute diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 14e60ee663..d9249f1ec2 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -78,6 +78,9 @@ akka { # is ignored if routees.paths is given nr-of-instances = 1 + # within is the timeout used for routers containing future calls + within = 5 seconds + # FIXME document 'create-as', ticket 1511 create-as { # fully qualified class name of recipe implementation diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index be9f452dbd..988d6bf126 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -5,7 +5,6 @@ package akka.actor import collection.immutable.Seq -import java.util.concurrent.ConcurrentHashMap import akka.event.Logging import akka.AkkaException import akka.config.ConfigurationException @@ -13,6 +12,7 @@ import akka.util.Duration import akka.event.EventStream import com.typesafe.config._ import akka.routing._ +import java.util.concurrent.{ TimeUnit, ConcurrentHashMap } case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope) @@ -53,11 +53,13 @@ class Deployer(val settings: ActorSystem.Settings) { val nrOfInstances = deployment.getInt("nr-of-instances") + val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) + val router: RouterConfig = deployment.getString("router") match { case "from-code" ⇒ NoRouter case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees) case "random" ⇒ RandomRouter(nrOfInstances, routees) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within) case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees) case x ⇒ throw new ConfigurationException("unknown router type " + x + " for path " + key) } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 7f9394fdc5..b4f3709e66 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -5,9 +5,8 @@ package akka.routing import akka.actor._ import java.util.concurrent.atomic.AtomicInteger -import akka.util.Timeout import scala.collection.JavaConversions._ -import java.util.concurrent.TimeUnit +import akka.util.{ Duration, Timeout } /** * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to @@ -84,6 +83,8 @@ trait RouterConfig { def routees: Iterable[String] + def within: Duration + def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route def createActor(): Router = new Router {} @@ -172,8 +173,9 @@ case class Destination(sender: ActorRef, recipient: ActorRef) * Oxymoron style. */ case object NoRouter extends RouterConfig { - def nrOfInstances: Int = 0 - def routees: Iterable[String] = Nil + def nrOfInstances = 0 + def routees = Nil + def within = Duration.Zero def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = null } @@ -191,7 +193,7 @@ object RoundRobinRouter { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with RoundRobinLike { +case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration = Duration.Zero) extends RouterConfig with RoundRobinLike { /** * Constructor that sets nrOfInstances to be created. @@ -244,7 +246,7 @@ object RandomRouter { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with RandomLike { +case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration = Duration.Zero) extends RouterConfig with RandomLike { /** * Constructor that sets nrOfInstances to be created. @@ -302,7 +304,7 @@ object BroadcastRouter { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with BroadcastLike { +case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration = Duration.Zero) extends RouterConfig with BroadcastLike { /** * Constructor that sets nrOfInstances to be created. @@ -335,7 +337,7 @@ trait BroadcastLike { this: RouterConfig ⇒ } object ScatterGatherFirstCompletedRouter { - def apply(routees: Iterable[ActorRef]) = new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString)) + def apply(routees: Iterable[ActorRef], within: Duration) = new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within) } /** * Simple router that broadcasts the message to all routees, and replies with the first response. @@ -348,23 +350,23 @@ object ScatterGatherFirstCompletedRouter { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) +case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration) extends RouterConfig with ScatterGatherFirstCompletedLike { /** * Constructor that sets nrOfInstances to be created. * Java API */ - def this(nr: Int) = { - this(nrOfInstances = nr) + def this(nr: Int, w: Duration) = { + this(nrOfInstances = nr, within = w) } /** * Constructor that sets the routees to be used. * Java API */ - def this(t: java.util.Collection[String]) = { - this(routees = collectionAsScalaIterable(t)) + def this(t: java.util.Collection[String], w: Duration) = { + this(routees = collectionAsScalaIterable(t), within = w) } } @@ -374,7 +376,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ - val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME! + val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get asker.result.pipeTo(sender) message match { case _ ⇒ toAll(asker, ref.routees) diff --git a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala index 583a3ee22c..63338e8357 100644 --- a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala +++ b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala @@ -68,7 +68,7 @@ class ParentActor extends Actor { case "sgfcr" ⇒ //#scatterGatherFirstCompletedRouter val scatterGatherFirstCompletedRouter = context.actorOf( - Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter()), + Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter(within = 2 seconds)), "router") implicit val timeout = context.system.settings.ActorTimeout val futureResult = scatterGatherFirstCompletedRouter ? FibonacciNumber(10) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index 0819d34019..ee712d8804 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -26,10 +26,10 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings if (nodes.isEmpty || deploy.routing == NoRouter) d else { val r = deploy.routing match { - case RoundRobinRouter(x, _) ⇒ RemoteRoundRobinRouter(x, nodes) - case RandomRouter(x, _) ⇒ RemoteRandomRouter(x, nodes) - case BroadcastRouter(x, _) ⇒ RemoteBroadcastRouter(x, nodes) - case ScatterGatherFirstCompletedRouter(x, _) ⇒ RemoteScatterGatherFirstCompletedRouter(x, nodes) + case RoundRobinRouter(x, _, w) ⇒ RemoteRoundRobinRouter(x, nodes, w) + case RandomRouter(x, _, w) ⇒ RemoteRandomRouter(x, nodes, w) + case BroadcastRouter(x, _, w) ⇒ RemoteBroadcastRouter(x, nodes, w) + case ScatterGatherFirstCompletedRouter(x, _, w) ⇒ RemoteScatterGatherFirstCompletedRouter(x, nodes, w) } Some(deploy.copy(routing = r)) } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala index fc08c222af..42af714d63 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala @@ -6,9 +6,9 @@ package akka.routing import akka.actor._ import akka.remote._ import scala.collection.JavaConverters._ -import java.util.concurrent.atomic.AtomicInteger import com.typesafe.config.ConfigFactory import akka.config.ConfigurationException +import akka.util.Duration trait RemoteRouterConfig extends RouterConfig { override protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Vector[ActorRef] = (nrOfInstances, routees) match { @@ -39,13 +39,13 @@ trait RemoteRouterConfig extends RouterConfig { * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with RoundRobinLike { +case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration) extends RemoteRouterConfig with RoundRobinLike { /** * Constructor that sets the routees to be used. * Java API */ - def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) + def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w) } /** @@ -59,13 +59,13 @@ case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String]) * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with RandomLike { +case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration) extends RemoteRouterConfig with RandomLike { /** * Constructor that sets the routees to be used. * Java API */ - def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) + def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w) } /** @@ -79,13 +79,13 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String]) ext * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with BroadcastLike { +case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration) extends RemoteRouterConfig with BroadcastLike { /** * Constructor that sets the routees to be used. * Java API */ - def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) + def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w) } /** @@ -99,12 +99,12 @@ case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String]) * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, routees: Iterable[String]) +case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration) extends RemoteRouterConfig with ScatterGatherFirstCompletedLike { /** * Constructor that sets the routees to be used. * Java API */ - def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) + def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w) }