pekko/akka-actor/src/main/scala/akka/actor/Deployer.scala

386 lines
13 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import collection.immutable.Seq
import java.util.concurrent.ConcurrentHashMap
import akka.event.EventHandler
import akka.actor.DeploymentConfig._
import akka.config.{ConfigurationException, Config}
import akka.util.ReflectiveAccess
import akka.AkkaException
/**
* Programatic deployment configuration classes. Most values have defaults and can be left out.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object DeploymentConfig {
// --------------------------------
// --- Deploy
// --------------------------------
case class Deploy(address: String, routing: Routing = Direct, format: String = "N/A", scope: Scope = Local)
// --------------------------------
// --- Routing
// --------------------------------
sealed trait Routing
case class CustomRouter(router: AnyRef) extends Routing
// For Java API
case class Direct() extends Routing
case class RoundRobin() extends Routing
case class Random() extends Routing
case class LeastCPU() extends Routing
case class LeastRAM() extends Routing
case class LeastMessages() extends Routing
// For Scala API
case object Direct extends Routing
case object RoundRobin extends Routing
case object Random extends Routing
case object LeastCPU extends Routing
case object LeastRAM extends Routing
case object LeastMessages extends Routing
// --------------------------------
// --- Scope
// --------------------------------
sealed trait Scope
case class Clustered(
home: Home = Host("localhost"),
replication: Replication = NoReplicas,
state: State = Stateful) extends Scope
// For Java API
case class Local() extends Scope
// For Scala API
case object Local extends Scope
// --------------------------------
// --- Home
// --------------------------------
sealed trait Home
case class Host(hostName: String) extends Home
case class Node(nodeName: String) extends Home
case class IP(ipAddress: String) extends Home
// --------------------------------
// --- Replication
// --------------------------------
sealed trait Replication
case class Replicate(factor: Int) extends Replication {
if (factor < 1) throw new IllegalArgumentException("Replication factor can not be negative or zero")
}
// For Java API
case class AutoReplicate() extends Replication
case class NoReplicas() extends Replication
// For Scala API
case object AutoReplicate extends Replication
case object NoReplicas extends Replication
// --------------------------------
// --- State
// --------------------------------
sealed trait State
// For Java API
case class Stateless() extends State
case class Stateful() extends State
// For Scala API
case object Stateless extends State
case object Stateful extends State
}
/**
* Deployer maps actor deployments to actor addresses.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Deployer {
val defaultAddress = Host("localhost") // FIXME allow configuring node-local default hostname and port
lazy val instance: ReflectiveAccess.ClusterModule.ClusterDeployer = {
val deployer =
if (ReflectiveAccess.ClusterModule.isEnabled) ReflectiveAccess.ClusterModule.clusterDeployer
else LocalDeployer
deployer.init(deploymentsInConfig)
deployer
}
def shutdown() {
instance.shutdown()
}
def deploy(deployment: Deploy) {
if (deployment eq null) throw new IllegalArgumentException("Deploy can not be null")
val address = deployment.address
Address.validate(address)
instance.deploy(deployment)
}
def deploy(deployment: Seq[Deploy]) {
deployment foreach (deploy(_))
}
/**
* Undeploy is idemponent. E.g. safe to invoke multiple times.
*/
def undeploy(deployment: Deploy) {
instance.undeploy(deployment)
}
def undeployAll() {
instance.undeployAll()
}
def isLocal(deployment: Deploy): Boolean = deployment match {
case Deploy(_, _, _, Local) => true
case _ => false
}
def isClustered(deployment: Deploy): Boolean = isLocal(deployment)
def isLocal(address: String): Boolean = isLocal(deploymentFor(address))
def isClustered(address: String): Boolean = !isLocal(address)
2011-04-27 15:00:41 +02:00
/**
* Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound.
*/
private[akka] def deploymentFor(address: String): Deploy = {
2011-04-27 15:00:41 +02:00
lookupDeploymentFor(address) match {
case Some(deployment) => deployment
case None => thrownNoDeploymentBoundException(address)
}
}
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
val deployment_? = instance.lookupDeploymentFor(address)
if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_?
else {
val newDeployment =
try {
lookupInConfig(address)
} catch {
case e: ConfigurationException =>
EventHandler.error(e, this, e.getMessage)
throw e
}
newDeployment foreach { d =>
2011-04-27 15:00:41 +02:00
if (d eq null) {
val e = new IllegalStateException("Deployment for address [" + address + "] is null")
EventHandler.error(e, this, e.getMessage)
throw e
}
deploy(d) // deploy and cache it
2011-04-27 15:00:41 +02:00
}
newDeployment
}
}
private[akka] def deploymentsInConfig: List[Deploy] = {
for {
address <- addressesInConfig
deployment <- lookupInConfig(address)
} yield deployment
}
private[akka] def addressesInConfig: List[String] = {
val deploymentPath = "akka.actor.deployment"
Config.config.getSection(deploymentPath) match {
case None => Nil
case Some(addressConfig) =>
addressConfig.map.keySet
.map(path => path.substring(0, path.indexOf(".")))
.toSet.toList // toSet to force uniqueness
}
}
/**
2011-04-27 15:00:41 +02:00
* Lookup deployment in 'akka.conf' configuration file.
*/
private[akka] def lookupInConfig(address: String): Option[Deploy] = {
// --------------------------------
// akka.actor.deployment.<address>
// --------------------------------
val addressPath = "akka.actor.deployment." + address
Config.config.getSection(addressPath) match {
case None => Some(Deploy(address, Direct, "N/A", Local))
case Some(addressConfig) =>
// --------------------------------
// akka.actor.deployment.<address>.router
// --------------------------------
val router = addressConfig.getString("router", "direct") match {
case "direct" => Direct
case "round-robin" => RoundRobin
case "random" => Random
case "least-cpu" => LeastCPU
case "least-ram" => LeastRAM
case "least-messages" => LeastMessages
case customRouterClassName =>
val customRouter = try {
Class.forName(customRouterClassName).newInstance.asInstanceOf[AnyRef]
} catch {
case e => throw new ConfigurationException(
"Config option [" + addressPath + ".router] needs to be one of " +
"[\"direct\", \"round-robin\", \"random\", \"least-cpu\", \"least-ram\", \"least-messages\" or FQN of router class]")
}
CustomRouter(customRouter)
}
// --------------------------------
// akka.actor.deployment.<address>.format
// --------------------------------
val format = addressConfig.getString("format", "N/A")
// --------------------------------
// akka.actor.deployment.<address>.clustered
// --------------------------------
addressConfig.getSection("clustered") match {
case None =>
Some(Deploy(address, router, "N/A", Local)) // deploy locally
case Some(clusteredConfig) =>
// --------------------------------
// akka.actor.deployment.<address>.clustered.home
// --------------------------------
val home = clusteredConfig.getString("home", "") match {
case "" => Host("localhost")
case home =>
def raiseHomeConfigError() = throw new ConfigurationException(
"Config option [" + addressPath +
".clustered.home] needs to be on format 'host:<hostname>', 'ip:<ip address>'' or 'node:<node name>', was [" +
home + "]")
if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError()
val tokenizer = new java.util.StringTokenizer(home, ":")
val protocol = tokenizer.nextElement
val address = tokenizer.nextElement.asInstanceOf[String]
protocol match {
case "host" => Host(address)
case "node" => Node(address)
case "ip" => IP(address)
case _ => raiseHomeConfigError()
}
}
// --------------------------------
// akka.actor.deployment.<address>.clustered.replicas
// --------------------------------
val replicas = clusteredConfig.getAny("replicas", "1") match {
case "auto" => AutoReplicate
case "1" => NoReplicas
case nrOfReplicas: String =>
try {
Replicate(nrOfReplicas.toInt)
} catch {
case e: NumberFormatException =>
throw new ConfigurationException(
"Config option [" + addressPath +
".clustered.replicas] needs to be either [\"auto\"] or [1-N] - was [" +
nrOfReplicas + "]")
}
}
// --------------------------------
// akka.actor.deployment.<address>.clustered.stateless
// --------------------------------
val state =
if (clusteredConfig.getBool("stateless", false)) Stateless
else Stateful
Some(Deploy(address, router, format, Clustered(home, replicas, state)))
}
}
}
private def throwDeploymentBoundException(deployment: Deploy): Nothing = {
val e = new DeploymentAlreadyBoundException(
"Address [" + deployment.address +
"] already bound to [" + deployment +
"]. You have to invoke 'undeploy(deployment) first.")
EventHandler.error(e, this, e.getMessage)
throw e
}
private def thrownNoDeploymentBoundException(address: String): Nothing = {
val e = new NoDeploymentBoundException("Address [" + address + "] is not bound to a deployment")
EventHandler.error(e, this, e.getMessage)
throw e
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object LocalDeployer {
private val deployments = new ConcurrentHashMap[String, Deploy]
private[akka] def init(deployments: List[Deploy]) {
EventHandler.info(this, "Deploying actors locally [\n\t%s\n]" format deployments.mkString("\n\t"))
deployments foreach (deploy(_)) // deploy
}
private[akka] def shutdown() {
undeployAll()
deployments.clear()
}
private[akka] def deploy(deployment: Deploy) {
if (deployments.putIfAbsent(deployment.address, deployment) != deployment) {
// FIXME do automatic 'undeploy' and redeploy (perhaps have it configurable if redeploy should be done or exception thrown)
// throwDeploymentBoundException(deployment)
}
}
private[akka] def undeploy(deployment: Deploy) {
deployments.remove(deployment.address)
}
private[akka] def undeployAll() {
deployments.clear()
}
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
val deployment = deployments.get(address)
if (deployment eq null) None
else Some(deployment)
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Address {
private val validAddressPattern = java.util.regex.Pattern.compile("[0-9a-zA-Z\\-\\_\\$\\.]+")
def validate(address: String) {
if (validAddressPattern.matcher(address).matches) true
else {
val e = new IllegalArgumentException("Address [" + address + "] is not valid, need to follow pattern [0-9a-zA-Z\\-\\_\\$]+")
EventHandler.error(e, this, e.getMessage)
throw e
}
}
}
class DeploymentException private[akka](message: String) extends AkkaException(message)
class DeploymentAlreadyBoundException private[akka](message: String) extends AkkaException(message)
class NoDeploymentBoundException private[akka](message: String) extends AkkaException(message)