diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 0bb70b3ffc..e6080ea96d 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ package akka.routing import java.util.concurrent.atomic.AtomicInteger diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index b364fd5247..399011fd7a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -27,7 +27,7 @@ object ActorPath { * is sorted by path elements FROM RIGHT TO LEFT, where RootActorPath > * ChildActorPath in case the number of elements is different. */ -sealed trait ActorPath extends Comparable[ActorPath] { +sealed trait ActorPath extends Comparable[ActorPath] with Serializable { /** * The Address under which this path can be reached; walks up the tree to * the RootActorPath. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 949f9e05a3..6c1ced9cdb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -72,7 +72,7 @@ trait ActorRefProvider { * in case of remote supervision). If systemService is true, deployment is * bypassed (local-only). */ - def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean): InternalActorRef + def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy] = None): InternalActorRef /** * Create actor reference for a specified local or remote path. If no such @@ -518,18 +518,18 @@ class LocalActorRefProvider( case x ⇒ x } - def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean): InternalActorRef = { + def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy] = None): InternalActorRef = { props.routerConfig match { case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor - case router ⇒ new RoutedActorRef(system, props.withRouting(adaptFromDeploy(router, path)), supervisor, path) + case router ⇒ + val depl = deploy orElse { + val lookupPath = path.elements.drop(1).mkString("/", "/", "") + deployer.lookup(lookupPath) + } + new RoutedActorRef(system, props.withRouting(router.adaptFromDeploy(depl)), supervisor, path) } } - private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = { - val lookupPath = p.elements.drop(1).mkString("/", "/", "") - r.adaptFromDeploy(deployer.lookup(lookupPath)) - } - def ask(within: Timeout): Option[AskActorRef] = { (if (within == null) settings.ActorTimeout else within) match { case t if t.duration.length <= 0 ⇒ diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 75c17a361d..8fdfb8d9eb 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -62,6 +62,12 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup */ trait RouterConfig { + def nrOfInstances: Int + + def targets: Iterable[String] + + def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Route + def createActor(): Router = new Router {} def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { @@ -72,8 +78,6 @@ trait RouterConfig { } } - def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Route - protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[String]): Vector[ActorRef] = (nrOfInstances, targets) match { case (0, Nil) ⇒ throw new IllegalArgumentException("Insufficient information - missing configuration.") case (x, Nil) ⇒ (1 to x).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) @@ -106,7 +110,9 @@ case class Broadcast(message: Any) * Oxymoron style. */ case object NoRouter extends RouterConfig { - def createRoute(creator: () ⇒ Actor, actorContext: ActorContext) = null + def nrOfInstances: Int = 0 + def targets: Iterable[String] = Nil + def createRoute(creator: () ⇒ Actor, actorContext: ActorContext): Route = null } object RoundRobinRouter { @@ -123,7 +129,7 @@ object RoundRobinRouter { * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig { +case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig with RoundRobinLike { /** * Constructor that sets nrOfInstances to be created. @@ -140,7 +146,9 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, targets: Iterable[String] = def this(t: java.util.Collection[String]) = { this(targets = collectionAsScalaIterable(t)) } +} +trait RoundRobinLike { this: RouterConfig ⇒ def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { val routees: Vector[ActorRef] = createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) @@ -175,7 +183,7 @@ object RandomRouter { * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig { +case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig with RandomLike { /** * Constructor that sets nrOfInstances to be created. @@ -192,6 +200,9 @@ case class RandomRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) def this(t: java.util.Collection[String]) = { this(targets = collectionAsScalaIterable(t)) } +} + +trait RandomLike { this: RouterConfig ⇒ import java.security.SecureRandom @@ -231,7 +242,7 @@ object BroadcastRouter { * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig { +case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig with BroadcastLike { /** * Constructor that sets nrOfInstances to be created. @@ -248,7 +259,9 @@ case class BroadcastRouter(nrOfInstances: Int = 0, targets: Iterable[String] = N def this(t: java.util.Collection[String]) = { this(targets = collectionAsScalaIterable(t)) } +} +trait BroadcastLike { this: RouterConfig ⇒ def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { val routees: Vector[ActorRef] = createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) @@ -277,7 +290,8 @@ object ScatterGatherFirstCompletedRouter { * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. */ -case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) extends RouterConfig { +case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: Iterable[String] = Nil) + extends RouterConfig with ScatterGatherFirstCompletedLike { /** * Constructor that sets nrOfInstances to be created. @@ -294,7 +308,9 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, targets: It def this(t: java.util.Collection[String]) = { this(targets = collectionAsScalaIterable(t)) } +} +trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def createRoute(creator: () ⇒ Actor, context: ActorContext): Route = { val routees: Vector[ActorRef] = createRoutees(context.props.copy(creator = creator, routerConfig = NoRouter), context, nrOfInstances, targets) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index d316a50cea..22886901b3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -57,7 +57,7 @@ class RemoteActorRefProvider( terminationFuture.onComplete(_ ⇒ remote.server.shutdown()) } - def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean): InternalActorRef = + def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy] = None): InternalActorRef = if (systemService) local.actorOf(system, props, supervisor, path, systemService) else { @@ -97,7 +97,7 @@ class RemoteActorRefProvider( } val elems = path.elements - val deployment = (elems.head match { + val deployment = deploy orElse (elems.head match { case "user" ⇒ deployer.lookup(elems.drop(1).mkString("/", "/", "")) case "remote" ⇒ lookupRemotes(elems) case _ ⇒ None diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index cd2abce344..0819d34019 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -4,7 +4,7 @@ package akka.remote import akka.actor._ -import akka.event.EventStream +import akka.routing._ import com.typesafe.config._ import akka.config.ConfigurationException @@ -16,16 +16,26 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings import scala.collection.JavaConverters._ import akka.util.ReflectiveAccess._ - val deployment = config.withFallback(default) - - val transform: Deploy ⇒ Deploy = - if (deployment.hasPath("remote")) deployment.getString("remote") match { - case RemoteAddressExtractor(r) ⇒ (d ⇒ d.copy(scope = RemoteScope(r))) - case x ⇒ identity - } - else identity - - super.parseConfig(path, config) map transform + super.parseConfig(path, config) match { + case d @ Some(deploy) ⇒ + deploy.config.getString("remote") match { + case RemoteAddressExtractor(r) ⇒ Some(deploy.copy(scope = RemoteScope(r))) + case str ⇒ + if (!str.isEmpty) throw new ConfigurationException("unparseable remote node name " + str) + val nodes = deploy.config.getStringList("target.nodes").asScala + if (nodes.isEmpty || deploy.routing == NoRouter) d + else { + val r = deploy.routing match { + case RoundRobinRouter(x, _) ⇒ RemoteRoundRobinRouter(x, nodes) + case RandomRouter(x, _) ⇒ RemoteRandomRouter(x, nodes) + case BroadcastRouter(x, _) ⇒ RemoteBroadcastRouter(x, nodes) + case ScatterGatherFirstCompletedRouter(x, _) ⇒ RemoteScatterGatherFirstCompletedRouter(x, nodes) + } + Some(deploy.copy(routing = r)) + } + } + case None ⇒ None + } } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala new file mode 100644 index 0000000000..2539b3939d --- /dev/null +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.routing + +import akka.actor._ +import akka.remote._ +import scala.collection.JavaConverters._ +import java.util.concurrent.atomic.AtomicInteger +import com.typesafe.config.ConfigFactory +import akka.config.ConfigurationException + +trait RemoteRouterConfig extends RouterConfig { + override protected def createRoutees(props: Props, context: ActorContext, nrOfInstances: Int, targets: Iterable[String]): Vector[ActorRef] = (nrOfInstances, targets) match { + case (_, Nil) ⇒ throw new ConfigurationException("must specify list of remote nodes") + case (n, xs) ⇒ + val nodes = targets map { + case RemoteAddressExtractor(a) ⇒ a + case x ⇒ throw new ConfigurationException("unparseable remote node " + x) + } + val node = Stream.continually(nodes).flatten.iterator + val impl = context.system.asInstanceOf[ActorSystemImpl] + Vector.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { + val name = "c" + i + val deploy = Deploy("", ConfigFactory.empty(), None, props.routerConfig, RemoteScope(node.next)) + impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy)) + }) + } +} + +/** + * A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the round robin should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RemoteRoundRobinRouter(nrOfInstances: Int, targets: Iterable[String]) extends RemoteRouterConfig with RoundRobinLike { + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) +} + +/** + * A Router that randomly selects one of the target connections to send a message to. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RemoteRandomRouter(nrOfInstances: Int, targets: Iterable[String]) extends RemoteRouterConfig with RandomLike { + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) +} + +/** + * A Router that uses broadcasts a message to all its connections. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RemoteBroadcastRouter(nrOfInstances: Int, targets: Iterable[String]) extends RemoteRouterConfig with BroadcastLike { + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) +} + +/** + * Simple router that broadcasts the message to all routees, and replies with the first response. + *
+ * Please note that providing both 'nrOfInstances' and 'targets' does not make logical sense as this means + * that the random router should both create new actors and use the 'targets' actor(s). + * In this case the 'nrOfInstances' will be ignored and the 'targets' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'targets' to during instantiation they will + * be ignored if the 'nrOfInstances' is defined in the configuration file for the actor being used. + */ +case class RemoteScatterGatherFirstCompletedRouter(nrOfInstances: Int, targets: Iterable[String]) + extends RemoteRouterConfig with ScatterGatherFirstCompletedLike { + + /** + * Constructor that sets the targets to be used. + * Java API + */ + def this(n: Int, t: java.util.Collection[String]) = this(n, t.asScala) +} diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala new file mode 100644 index 0000000000..c9d34a3feb --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.remote + +import akka.testkit._ +import akka.routing._ +import akka.actor._ +import com.typesafe.config._ + +object RemoteRouterSpec { + class Echo extends Actor { + def receive = { + case _ ⇒ sender ! self.path + } + } +} + +class RemoteRouterSpec extends AkkaSpec(""" +akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + cluster.nodename = Nonsense + remote.server { + hostname = localhost + port = 12345 + } + actor.deployment { + /blub { + router = "round-robin" + nr-of-instances = 2 + target.nodes = ["akka://remote_sys@localhost:12346"] + } + } +} +""") with ImplicitSender { + + import RemoteRouterSpec._ + + val conf = ConfigFactory.parseString("akka.remote.server.port=12346").withFallback(system.settings.config) + val other = ActorSystem("remote_sys", conf) + + override def atTermination() { + other.stop() + } + + "A Remote Router" must { + + "deploy its children on remote host driven by configuration" in { + val router = system.actorOf(Props[Echo].withRouting(RoundRobinRouter(2)), "blub") + router ! "" + expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c1" + router ! "" + expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c2" + } + + } + +} \ No newline at end of file