2011-04-27 00:40:20 +02:00
|
|
|
/**
|
2012-01-19 18:21:06 +01:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2011-04-27 00:40:20 +02:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.actor
|
|
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
import akka.util.Duration
|
2011-12-09 20:19:59 +01:00
|
|
|
import com.typesafe.config._
|
2011-12-12 23:31:15 +01:00
|
|
|
import akka.routing._
|
2011-12-20 19:57:42 +01:00
|
|
|
import java.util.concurrent.{ TimeUnit, ConcurrentHashMap }
|
2011-12-12 23:31:15 +01:00
|
|
|
|
2012-01-31 21:19:28 +01:00
|
|
|
/**
|
|
|
|
|
* This class represents deployment configuration for a given actor path. It is
|
|
|
|
|
* marked final in order to guarantee stable merge semantics (i.e. what
|
|
|
|
|
* overrides what in case multiple configuration sources are available) and is
|
|
|
|
|
* fully extensible via its Scope argument, and by the fact that an arbitrary
|
|
|
|
|
* Config section can be passed along with it (which will be merged when merging
|
|
|
|
|
* two Deploys).
|
|
|
|
|
*
|
|
|
|
|
* The path field is used only when inserting the Deploy into a deployer and
|
|
|
|
|
* not needed when just doing deploy-as-you-go:
|
|
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* context.actorOf(someProps, "someName", Deploy(scope = RemoteScope("someOtherNodeName")))
|
|
|
|
|
* }}}
|
|
|
|
|
*/
|
2012-02-07 10:28:42 +01:00
|
|
|
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
|
2012-01-31 21:19:28 +01:00
|
|
|
final case class Deploy(
|
|
|
|
|
path: String = "",
|
|
|
|
|
config: Config = ConfigFactory.empty,
|
|
|
|
|
routerConfig: RouterConfig = NoRouter,
|
2012-02-03 09:43:23 +01:00
|
|
|
scope: Scope = NoScopeGiven) {
|
2012-01-31 21:19:28 +01:00
|
|
|
|
|
|
|
|
def this(routing: RouterConfig) = this("", ConfigFactory.empty, routing)
|
|
|
|
|
def this(routing: RouterConfig, scope: Scope) = this("", ConfigFactory.empty, routing, scope)
|
2012-02-02 09:40:17 +01:00
|
|
|
def this(scope: Scope) = this("", ConfigFactory.empty, NoRouter, scope)
|
2012-01-31 21:19:28 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Do a merge between this and the other Deploy, where values from “this” take
|
|
|
|
|
* precedence. The “path” of the other Deploy is not taken into account. All
|
|
|
|
|
* other members are merged using ``<X>.withFallback(other.<X>)``.
|
|
|
|
|
*/
|
2012-02-03 09:43:23 +01:00
|
|
|
def withFallback(other: Deploy): Deploy =
|
2012-01-31 21:19:28 +01:00
|
|
|
Deploy(path, config.withFallback(other.config), routerConfig.withFallback(other.routerConfig), scope.withFallback(other.scope))
|
|
|
|
|
}
|
|
|
|
|
|
2012-02-03 09:43:23 +01:00
|
|
|
/**
|
|
|
|
|
* The scope of a [[akka.actor.Deploy]] serves two purposes: as a marker for
|
|
|
|
|
* pattern matching the “scope” (i.e. local/remote/cluster) as well as for
|
|
|
|
|
* extending the information carried by the final Deploy class. Scopes can be
|
|
|
|
|
* used in conjunction with a custom [[akka.actor.ActorRefProvider]], making
|
|
|
|
|
* Akka actors fully extensible.
|
|
|
|
|
*/
|
2012-01-31 21:19:28 +01:00
|
|
|
trait Scope {
|
2012-02-03 09:43:23 +01:00
|
|
|
/**
|
|
|
|
|
* When merging [[akka.actor.Deploy]] instances using ``withFallback()`` on
|
|
|
|
|
* the left one, this is propagated to “merging” scopes in the same way.
|
|
|
|
|
* The setup is biased towards preferring the callee over the argument, i.e.
|
|
|
|
|
* ``a.withFallback(b)`` is called expecting that ``a`` should in general take
|
|
|
|
|
* precedence.
|
|
|
|
|
*/
|
2012-01-31 21:19:28 +01:00
|
|
|
def withFallback(other: Scope): Scope
|
|
|
|
|
}
|
|
|
|
|
|
2012-02-07 10:28:42 +01:00
|
|
|
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
|
2012-01-31 21:19:28 +01:00
|
|
|
case object LocalScope extends Scope {
|
|
|
|
|
/**
|
|
|
|
|
* Java API
|
|
|
|
|
*/
|
2012-02-03 09:43:23 +01:00
|
|
|
def scope: Scope = this
|
2011-12-12 23:31:15 +01:00
|
|
|
|
2012-01-31 21:19:28 +01:00
|
|
|
def withFallback(other: Scope): Scope = this
|
|
|
|
|
}
|
2011-12-12 23:31:15 +01:00
|
|
|
|
2012-01-31 21:19:28 +01:00
|
|
|
/**
|
|
|
|
|
* This is the default value and as such allows overrides.
|
|
|
|
|
*/
|
2012-02-07 10:28:42 +01:00
|
|
|
//TODO add @SerialVersionUID(1L) when SI-4804 is fixed
|
2012-02-03 09:43:23 +01:00
|
|
|
case object NoScopeGiven extends Scope {
|
2012-01-31 21:19:28 +01:00
|
|
|
def withFallback(other: Scope): Scope = other
|
|
|
|
|
}
|
2011-04-27 00:40:20 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-11-08 16:49:50 +01:00
|
|
|
* Deployer maps actor paths to actor deployments.
|
2011-04-27 00:40:20 +02:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2012-02-10 11:36:23 +01:00
|
|
|
class Deployer(val settings: ActorSystem.Settings, val dynamicAccess: DynamicAccess) {
|
2011-05-03 21:04:45 +02:00
|
|
|
|
2011-12-09 20:19:59 +01:00
|
|
|
import scala.collection.JavaConverters._
|
2011-05-03 21:04:45 +02:00
|
|
|
|
2011-12-09 20:19:59 +01:00
|
|
|
private val deployments = new ConcurrentHashMap[String, Deploy]
|
|
|
|
|
private val config = settings.config.getConfig("akka.actor.deployment")
|
|
|
|
|
protected val default = config.getConfig("default")
|
|
|
|
|
config.root.asScala flatMap {
|
|
|
|
|
case ("default", _) ⇒ None
|
|
|
|
|
case (key, value: ConfigObject) ⇒ parseConfig(key, value.toConfig)
|
|
|
|
|
case _ ⇒ None
|
|
|
|
|
} foreach deploy
|
2011-05-03 21:04:45 +02:00
|
|
|
|
2011-12-09 20:19:59 +01:00
|
|
|
def lookup(path: String): Option[Deploy] = Option(deployments.get(path))
|
2011-11-15 11:34:39 +01:00
|
|
|
|
2011-12-09 20:19:59 +01:00
|
|
|
def deploy(d: Deploy): Unit = deployments.put(d.path, d)
|
2011-05-03 21:04:45 +02:00
|
|
|
|
2011-12-09 20:19:59 +01:00
|
|
|
protected def parseConfig(key: String, config: Config): Option[Deploy] = {
|
2011-04-27 00:40:20 +02:00
|
|
|
|
2011-12-09 20:19:59 +01:00
|
|
|
val deployment = config.withFallback(default)
|
2011-07-26 17:12:00 +02:00
|
|
|
|
2011-12-17 16:33:29 +01:00
|
|
|
val routees = deployment.getStringList("routees.paths").asScala.toSeq
|
2011-12-12 23:31:15 +01:00
|
|
|
|
|
|
|
|
val nrOfInstances = deployment.getInt("nr-of-instances")
|
|
|
|
|
|
2011-12-20 19:57:42 +01:00
|
|
|
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
|
|
|
|
|
|
2012-01-10 15:53:27 +01:00
|
|
|
val resizer: Option[Resizer] = if (config.hasPath("resizer")) {
|
2012-01-12 16:37:08 +01:00
|
|
|
Some(DefaultResizer(deployment.getConfig("resizer")))
|
2012-01-09 20:25:24 +01:00
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
|
2011-12-12 23:31:15 +01:00
|
|
|
val router: RouterConfig = deployment.getString("router") match {
|
2012-01-11 13:56:38 +01:00
|
|
|
case "from-code" ⇒ NoRouter
|
|
|
|
|
case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer)
|
|
|
|
|
case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer)
|
|
|
|
|
case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer)
|
|
|
|
|
case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
|
|
|
|
|
case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer)
|
2012-01-12 16:37:08 +01:00
|
|
|
case fqn ⇒
|
2012-01-30 11:48:02 +01:00
|
|
|
val args = Seq(classOf[Config] -> deployment)
|
2012-02-10 11:36:23 +01:00
|
|
|
dynamicAccess.createInstanceFor[RouterConfig](fqn, args) match {
|
2012-01-12 16:37:08 +01:00
|
|
|
case Right(router) ⇒ router
|
|
|
|
|
case Left(exception) ⇒
|
|
|
|
|
throw new IllegalArgumentException(
|
|
|
|
|
("Cannot instantiate router [%s], defined in [%s], " +
|
|
|
|
|
"make sure it extends [akka.routing.RouterConfig] and has constructor with " +
|
|
|
|
|
"[com.typesafe.config.Config] parameter")
|
|
|
|
|
.format(fqn, key), exception)
|
|
|
|
|
}
|
2011-11-19 19:55:44 +01:00
|
|
|
}
|
2011-09-28 14:50:09 +02:00
|
|
|
|
2012-02-03 09:43:23 +01:00
|
|
|
Some(Deploy(key, deployment, router, NoScopeGiven))
|
2011-04-27 00:40:20 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|