Initial take on removing hardcoded value from SGFCR. See #1529

This commit is contained in:
Henrik Engstrom 2011-12-20 19:57:42 +01:00
parent f6f52c455d
commit 0dc161c800
8 changed files with 60 additions and 54 deletions

View file

@ -8,6 +8,7 @@ import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import akka.routing._ import akka.routing._
import akka.util.duration._
object DeployerSpec { object DeployerSpec {
val deployerConf = ConfigFactory.parseString(""" val deployerConf = ConfigFactory.parseString("""
@ -35,6 +36,7 @@ object DeployerSpec {
} }
/user/service-scatter-gather { /user/service-scatter-gather {
router = scatter-gather router = scatter-gather
within = 2 seconds
} }
} }
""", ConfigParseOptions.defaults) """, ConfigParseOptions.defaults)
@ -116,7 +118,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
} }
"be able to parse 'akka.actor.deployment._' with scatter-gather router" in { "be able to parse 'akka.actor.deployment._' with scatter-gather router" in {
assertRouting(ScatterGatherFirstCompletedRouter(1), "/user/service-scatter-gather") assertRouting(ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/user/service-scatter-gather")
} }
def assertRouting(expected: RouterConfig, service: String) { def assertRouting(expected: RouterConfig, service: String) {

View file

@ -6,11 +6,10 @@ package akka.routing
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor._ import akka.actor._
import collection.mutable.LinkedList import collection.mutable.LinkedList
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import akka.dispatch.Await import akka.dispatch.Await
import com.typesafe.config.ConfigFactory import akka.util.Duration
object RoutingSpec { object RoutingSpec {
@ -30,18 +29,7 @@ object RoutingSpec {
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" class RoutingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
akka {
actor {
deployment {
/a1 {
router = round-robin
nr-of-instances = 3
}
}
}
}
""")) with DefaultTimeout with ImplicitSender {
val impl = system.asInstanceOf[ActorSystemImpl] val impl = system.asInstanceOf[ActorSystemImpl]
@ -72,13 +60,16 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
} }
"be able to send their routees" in { "be able to send their routees" in {
val doneLatch = new CountDownLatch(1) val doneLatch = new TestLatch(1)
class TheActor extends Actor { class TheActor extends Actor {
val routee1 = context.actorOf(Props[TestActor], "routee1") val routee1 = context.actorOf(Props[TestActor], "routee1")
val routee2 = context.actorOf(Props[TestActor], "routee2") val routee2 = context.actorOf(Props[TestActor], "routee2")
val routee3 = context.actorOf(Props[TestActor], "routee3") val routee3 = context.actorOf(Props[TestActor], "routee3")
val router = context.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(routee1, routee2, routee3)))) val router = context.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(
routees = List(routee1, routee2, routee3),
within = 5 seconds)))
def receive = { def receive = {
case RouterRoutees(iterable) case RouterRoutees(iterable)
@ -93,7 +84,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
val theActor = system.actorOf(Props(new TheActor), "theActor") val theActor = system.actorOf(Props(new TheActor), "theActor")
theActor ! "doIt" theActor ! "doIt"
doneLatch.await(1, TimeUnit.SECONDS) must be(true) Await.ready(doneLatch, 1 seconds)
} }
} }
@ -314,7 +305,8 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
"Scatter-gather router" must { "Scatter-gather router" must {
"be started when constructed" in { "be started when constructed" in {
val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(newActor(0))))) val routedActor = system.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(routees = List(newActor(0)), within = 1 seconds)))
routedActor.isTerminated must be(false) routedActor.isTerminated must be(false)
} }
@ -337,7 +329,8 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
} }
})) }))
val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2)))) val routedActor = system.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 1 seconds)))
routedActor ! Broadcast(1) routedActor ! Broadcast(1)
routedActor ! Broadcast("end") routedActor ! Broadcast("end")
@ -350,12 +343,13 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
"return response, even if one of the actors has stopped" in { "return response, even if one of the actors has stopped" in {
val shutdownLatch = new TestLatch(1) val shutdownLatch = new TestLatch(1)
val actor1 = newActor(1, Some(shutdownLatch)) val actor1 = newActor(1, Some(shutdownLatch))
val actor2 = newActor(22, Some(shutdownLatch)) val actor2 = newActor(14, Some(shutdownLatch))
val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2)))) val routedActor = system.actorOf(Props[TestActor].withRouter(
ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2), within = 3 seconds)))
routedActor ! Broadcast(Stop(Some(1))) routedActor ! Broadcast(Stop(Some(1)))
Await.ready(shutdownLatch, TestLatch.DefaultTimeout) Await.ready(shutdownLatch, TestLatch.DefaultTimeout)
Await.result(routedActor ? Broadcast(0), timeout.duration) must be(22) Await.result(routedActor ? Broadcast(0), timeout.duration) must be(14)
} }
case class Stop(id: Option[Int] = None) case class Stop(id: Option[Int] = None)
@ -428,7 +422,10 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
//#crActors //#crActors
//#crRouter //#crRouter
case class VoteCountRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) case class VoteCountRouter(
nrOfInstances: Int = 0,
routees: Iterable[String] = Nil,
within: Duration = Duration.Zero)
extends RouterConfig { extends RouterConfig {
//#crRoute //#crRoute

View file

@ -78,6 +78,9 @@ akka {
# is ignored if routees.paths is given # is ignored if routees.paths is given
nr-of-instances = 1 nr-of-instances = 1
# within is the timeout used for routers containing future calls
within = 5 seconds
# FIXME document 'create-as', ticket 1511 # FIXME document 'create-as', ticket 1511
create-as { create-as {
# fully qualified class name of recipe implementation # fully qualified class name of recipe implementation

View file

@ -5,7 +5,6 @@
package akka.actor package akka.actor
import collection.immutable.Seq import collection.immutable.Seq
import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging import akka.event.Logging
import akka.AkkaException import akka.AkkaException
import akka.config.ConfigurationException import akka.config.ConfigurationException
@ -13,6 +12,7 @@ import akka.util.Duration
import akka.event.EventStream import akka.event.EventStream
import com.typesafe.config._ import com.typesafe.config._
import akka.routing._ import akka.routing._
import java.util.concurrent.{ TimeUnit, ConcurrentHashMap }
case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope) case class Deploy(path: String, config: Config, recipe: Option[ActorRecipe] = None, routing: RouterConfig = NoRouter, scope: Scope = LocalScope)
@ -53,11 +53,13 @@ class Deployer(val settings: ActorSystem.Settings) {
val nrOfInstances = deployment.getInt("nr-of-instances") val nrOfInstances = deployment.getInt("nr-of-instances")
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
val router: RouterConfig = deployment.getString("router") match { val router: RouterConfig = deployment.getString("router") match {
case "from-code" NoRouter case "from-code" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, routees) case "round-robin" RoundRobinRouter(nrOfInstances, routees)
case "random" RandomRouter(nrOfInstances, routees) case "random" RandomRouter(nrOfInstances, routees)
case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees) case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within)
case "broadcast" BroadcastRouter(nrOfInstances, routees) case "broadcast" BroadcastRouter(nrOfInstances, routees)
case x throw new ConfigurationException("unknown router type " + x + " for path " + key) case x throw new ConfigurationException("unknown router type " + x + " for path " + key)
} }

View file

@ -5,9 +5,8 @@ package akka.routing
import akka.actor._ import akka.actor._
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.util.Timeout
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import java.util.concurrent.TimeUnit import akka.util.{ Duration, Timeout }
/** /**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to * A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to
@ -84,6 +83,8 @@ trait RouterConfig {
def routees: Iterable[String] def routees: Iterable[String]
def within: Duration
def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route
def createActor(): Router = new Router {} def createActor(): Router = new Router {}
@ -172,8 +173,9 @@ case class Destination(sender: ActorRef, recipient: ActorRef)
* Oxymoron style. * Oxymoron style.
*/ */
case object NoRouter extends RouterConfig { case object NoRouter extends RouterConfig {
def nrOfInstances: Int = 0 def nrOfInstances = 0
def routees: Iterable[String] = Nil def routees = Nil
def within = Duration.Zero
def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = null def createRoute(props: Props, actorContext: ActorContext, ref: RoutedActorRef): Route = null
} }
@ -191,7 +193,7 @@ object RoundRobinRouter {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with RoundRobinLike { case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration = Duration.Zero) extends RouterConfig with RoundRobinLike {
/** /**
* Constructor that sets nrOfInstances to be created. * Constructor that sets nrOfInstances to be created.
@ -244,7 +246,7 @@ object RandomRouter {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with RandomLike { case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration = Duration.Zero) extends RouterConfig with RandomLike {
/** /**
* Constructor that sets nrOfInstances to be created. * Constructor that sets nrOfInstances to be created.
@ -302,7 +304,7 @@ object BroadcastRouter {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) extends RouterConfig with BroadcastLike { case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration = Duration.Zero) extends RouterConfig with BroadcastLike {
/** /**
* Constructor that sets nrOfInstances to be created. * Constructor that sets nrOfInstances to be created.
@ -335,7 +337,7 @@ trait BroadcastLike { this: RouterConfig ⇒
} }
object ScatterGatherFirstCompletedRouter { object ScatterGatherFirstCompletedRouter {
def apply(routees: Iterable[ActorRef]) = new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString)) def apply(routees: Iterable[ActorRef], within: Duration) = new ScatterGatherFirstCompletedRouter(routees = routees map (_.path.toString), within = within)
} }
/** /**
* Simple router that broadcasts the message to all routees, and replies with the first response. * Simple router that broadcasts the message to all routees, and replies with the first response.
@ -348,23 +350,23 @@ object ScatterGatherFirstCompletedRouter {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil) case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, within: Duration)
extends RouterConfig with ScatterGatherFirstCompletedLike { extends RouterConfig with ScatterGatherFirstCompletedLike {
/** /**
* Constructor that sets nrOfInstances to be created. * Constructor that sets nrOfInstances to be created.
* Java API * Java API
*/ */
def this(nr: Int) = { def this(nr: Int, w: Duration) = {
this(nrOfInstances = nr) this(nrOfInstances = nr, within = w)
} }
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(t: java.util.Collection[String]) = { def this(t: java.util.Collection[String], w: Duration) = {
this(routees = collectionAsScalaIterable(t)) this(routees = collectionAsScalaIterable(t), within = w)
} }
} }
@ -374,7 +376,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒
{ {
case (sender, message) case (sender, message)
val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(5, TimeUnit.SECONDS)).get // FIXME, NO REALLY FIXME! val asker = context.asInstanceOf[ActorCell].systemImpl.provider.ask(Timeout(within)).get
asker.result.pipeTo(sender) asker.result.pipeTo(sender)
message match { message match {
case _ toAll(asker, ref.routees) case _ toAll(asker, ref.routees)

View file

@ -68,7 +68,7 @@ class ParentActor extends Actor {
case "sgfcr" case "sgfcr"
//#scatterGatherFirstCompletedRouter //#scatterGatherFirstCompletedRouter
val scatterGatherFirstCompletedRouter = context.actorOf( val scatterGatherFirstCompletedRouter = context.actorOf(
Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter()), Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter(within = 2 seconds)),
"router") "router")
implicit val timeout = context.system.settings.ActorTimeout implicit val timeout = context.system.settings.ActorTimeout
val futureResult = scatterGatherFirstCompletedRouter ? FibonacciNumber(10) val futureResult = scatterGatherFirstCompletedRouter ? FibonacciNumber(10)

View file

@ -26,10 +26,10 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings
if (nodes.isEmpty || deploy.routing == NoRouter) d if (nodes.isEmpty || deploy.routing == NoRouter) d
else { else {
val r = deploy.routing match { val r = deploy.routing match {
case RoundRobinRouter(x, _) RemoteRoundRobinRouter(x, nodes) case RoundRobinRouter(x, _, w) RemoteRoundRobinRouter(x, nodes, w)
case RandomRouter(x, _) RemoteRandomRouter(x, nodes) case RandomRouter(x, _, w) RemoteRandomRouter(x, nodes, w)
case BroadcastRouter(x, _) RemoteBroadcastRouter(x, nodes) case BroadcastRouter(x, _, w) RemoteBroadcastRouter(x, nodes, w)
case ScatterGatherFirstCompletedRouter(x, _) RemoteScatterGatherFirstCompletedRouter(x, nodes) case ScatterGatherFirstCompletedRouter(x, _, w) RemoteScatterGatherFirstCompletedRouter(x, nodes, w)
} }
Some(deploy.copy(routing = r)) Some(deploy.copy(routing = r))
} }

View file

@ -6,9 +6,9 @@ package akka.routing
import akka.actor._ import akka.actor._
import akka.remote._ import akka.remote._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.util.Duration
trait RemoteRouterConfig extends RouterConfig { trait RemoteRouterConfig extends RouterConfig {
override protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Vector[ActorRef] = (nrOfInstances, routees) match { override protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, routees: Iterable[String]): Vector[ActorRef] = (nrOfInstances, routees) match {
@ -39,13 +39,13 @@ trait RemoteRouterConfig extends RouterConfig {
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with RoundRobinLike { case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration) extends RemoteRouterConfig with RoundRobinLike {
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w)
} }
/** /**
@ -59,13 +59,13 @@ case class RemoteRoundRobinRouter(nrOfInstances: Int, routees: Iterable[String])
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with RandomLike { case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration) extends RemoteRouterConfig with RandomLike {
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w)
} }
/** /**
@ -79,13 +79,13 @@ case class RemoteRandomRouter(nrOfInstances: Int, routees: Iterable[String]) ext
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String]) extends RemoteRouterConfig with BroadcastLike { case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration) extends RemoteRouterConfig with BroadcastLike {
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w)
} }
/** /**
@ -99,12 +99,12 @@ case class RemoteBroadcastRouter(nrOfInstances: Int, routees: Iterable[String])
* if you provide either 'nrOfInstances' or 'routees' to during instantiation they will * if you provide either 'nrOfInstances' or 'routees' to during instantiation they will
* be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used.
*/ */
case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, routees: Iterable[String]) case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, routees: Iterable[String], within: Duration)
extends RemoteRouterConfig with ScatterGatherFirstCompletedLike { extends RemoteRouterConfig with ScatterGatherFirstCompletedLike {
/** /**
* Constructor that sets the routees to be used. * Constructor that sets the routees to be used.
* Java API * Java API
*/ */
def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) def this(n: Int, t: java.util.Collection[String], w: Duration) = this(n, t.asScala, w)
} }