2011-04-27 00:40:20 +02:00
/* *
2011-07-14 16:03:08 +02:00
* Copyright ( C ) 2009 - 2011 Typesafe Inc . < http : //www.typesafe.com>
2011-04-27 00:40:20 +02:00
*/
package akka.actor
import collection.immutable.Seq
import java.util.concurrent.ConcurrentHashMap
import akka.event.EventHandler
import akka.actor.DeploymentConfig._
2011-10-07 15:42:55 +02:00
import akka.util.Duration
2011-05-20 17:13:39 +02:00
import akka.util.ReflectiveAccess._
2011-04-29 15:47:56 +02:00
import akka.AkkaException
2011-07-26 17:12:00 +02:00
import akka.config. { Configuration , ConfigurationException , Config }
trait ActorDeployer {
private [ akka ] def init ( deployments : Seq [ Deploy ] ) : Unit
private [ akka ] def shutdown ( ) : Unit //TODO Why should we have "shutdown", should be crash only?
private [ akka ] def deploy ( deployment : Deploy ) : Unit
private [ akka ] def lookupDeploymentFor ( address : String ) : Option [ Deploy ]
private [ akka ] def deploy ( deployment : Seq [ Deploy ] ) : Unit = deployment foreach ( deploy ( _ ) )
}
2011-04-27 00:40:20 +02:00
/* *
* Deployer maps actor deployments to actor addresses .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2011-07-26 17:12:00 +02:00
object Deployer extends ActorDeployer {
2011-05-03 21:04:45 +02:00
2011-08-09 13:33:10 +03:00
// val defaultAddress = Node(Config.nodename)
2011-05-03 21:04:45 +02:00
2011-07-26 17:12:00 +02:00
lazy val instance : ActorDeployer = {
val deployer = if ( ClusterModule . isEnabled ) ClusterModule . clusterDeployer else LocalDeployer
2011-05-03 21:04:45 +02:00
deployer . init ( deploymentsInConfig )
deployer
}
2011-07-26 17:12:00 +02:00
def start ( ) : Unit = instance . toString //Force evaluation
2011-05-25 16:18:35 +02:00
2011-07-26 17:12:00 +02:00
private [ akka ] def init ( deployments : Seq [ Deploy ] ) = instance . init ( deployments )
2011-04-27 00:40:20 +02:00
2011-07-26 17:12:00 +02:00
def shutdown ( ) : Unit = instance . shutdown ( ) //TODO Why should we have "shutdown", should be crash only?
2011-04-27 00:40:20 +02:00
2011-07-26 17:12:00 +02:00
def deploy ( deployment : Deploy ) : Unit = instance . deploy ( deployment )
2011-04-27 00:40:20 +02:00
2011-05-03 21:04:45 +02:00
def isLocal ( deployment : Deploy ) : Boolean = deployment match {
2011-09-28 14:50:09 +02:00
case Deploy ( _ , _ , _ , _ , _ , LocalScope ) | Deploy ( _ , _ , _ , _ , _ , _ : LocalScope ) ⇒ true
2011-07-26 17:12:00 +02:00
case _ ⇒ false
2011-05-03 21:04:45 +02:00
}
2011-07-26 17:12:00 +02:00
def isClustered ( deployment : Deploy ) : Boolean = ! isLocal ( deployment )
2011-05-03 21:04:45 +02:00
2011-07-26 17:12:00 +02:00
def isLocal ( address : String ) : Boolean = isLocal ( deploymentFor ( address ) ) //TODO Should this throw exception if address not found?
2011-05-03 21:04:45 +02:00
2011-07-26 17:12:00 +02:00
def isClustered ( address : String ) : Boolean = ! isLocal ( address ) //TODO Should this throw exception if address not found?
2011-05-03 21:04:45 +02:00
2011-04-27 15:00:41 +02:00
/* *
* Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound .
*/
2011-05-03 21:04:45 +02:00
private [ akka ] def deploymentFor ( address : String ) : Deploy = {
2011-04-27 15:00:41 +02:00
lookupDeploymentFor ( address ) match {
2011-05-18 17:25:30 +02:00
case Some ( deployment ) ⇒ deployment
case None ⇒ thrownNoDeploymentBoundException ( address )
2011-04-27 15:00:41 +02:00
}
}
2011-05-03 21:04:45 +02:00
private [ akka ] def lookupDeploymentFor ( address : String ) : Option [ Deploy ] = {
val deployment_? = instance . lookupDeploymentFor ( address )
2011-07-04 19:10:06 +02:00
2011-04-29 15:47:56 +02:00
if ( deployment_? . isDefined && ( deployment_? . get ne null ) ) deployment_?
2011-04-27 00:40:20 +02:00
else {
2011-07-26 17:12:00 +02:00
val newDeployment = try {
lookupInConfig ( address )
} catch {
case e : ConfigurationException ⇒
EventHandler . error ( e , this , e . getMessage )
throw e
}
2011-07-04 19:10:06 +02:00
2011-05-18 17:25:30 +02:00
newDeployment foreach { d ⇒
2011-04-27 15:00:41 +02:00
if ( d eq null ) {
val e = new IllegalStateException ( "Deployment for address [" + address + "] is null" )
EventHandler . error ( e , this , e . getMessage )
throw e
}
2011-04-29 15:47:56 +02:00
deploy ( d ) // deploy and cache it
2011-04-27 15:00:41 +02:00
}
2011-07-04 19:10:06 +02:00
2011-04-29 15:47:56 +02:00
newDeployment
2011-04-27 00:40:20 +02:00
}
}
2011-05-03 21:04:45 +02:00
private [ akka ] def deploymentsInConfig : List [ Deploy ] = {
for {
2011-05-18 17:25:30 +02:00
address ← addressesInConfig
deployment ← lookupInConfig ( address )
2011-05-03 21:04:45 +02:00
} yield deployment
}
private [ akka ] def addressesInConfig : List [ String ] = {
val deploymentPath = "akka.actor.deployment"
Config . config . getSection ( deploymentPath ) match {
2011-05-18 17:25:30 +02:00
case None ⇒ Nil
case Some ( addressConfig ) ⇒
2011-05-03 21:04:45 +02:00
addressConfig . map . keySet
2011-05-18 17:25:30 +02:00
. map ( path ⇒ path . substring ( 0 , path . indexOf ( "." ) ) )
2011-05-03 21:04:45 +02:00
. toSet . toList // toSet to force uniqueness
}
}
2011-04-27 00:40:20 +02:00
/* *
2011-04-27 15:00:41 +02:00
* Lookup deployment in 'akka. conf ' configuration file .
2011-04-27 00:40:20 +02:00
*/
2011-07-26 17:12:00 +02:00
private [ akka ] def lookupInConfig ( address : String , configuration : Configuration = Config . config ) : Option [ Deploy ] = {
import akka.util.ReflectiveAccess. { createInstance , emptyArguments , emptyParams , getClassFor }
2011-04-27 00:40:20 +02:00
// --------------------------------
// akka.actor.deployment.<address>
// --------------------------------
val addressPath = "akka.actor.deployment." + address
2011-07-26 17:12:00 +02:00
configuration . getSection ( addressPath ) match {
2011-08-31 15:07:18 +02:00
case None ⇒
2011-10-07 15:42:55 +02:00
Some ( Deploy ( address , None , Direct , NrOfInstances ( 1 ) , NoOpFailureDetector , LocalScope ) )
2011-08-31 15:07:18 +02:00
2011-05-18 17:25:30 +02:00
case Some ( addressConfig ) ⇒
2011-04-27 00:40:20 +02:00
// --------------------------------
// akka.actor.deployment.<address>.router
// --------------------------------
2011-07-26 17:12:00 +02:00
val router : Routing = addressConfig . getString ( "router" , "direct" ) match {
2011-05-18 17:25:30 +02:00
case "direct" ⇒ Direct
case "round-robin" ⇒ RoundRobin
case "random" ⇒ Random
2011-10-07 15:42:55 +02:00
case "scatter-gather" ⇒ ScatterGather
2011-05-18 17:25:30 +02:00
case "least-cpu" ⇒ LeastCPU
case "least-ram" ⇒ LeastRAM
case "least-messages" ⇒ LeastMessages
case customRouterClassName ⇒
2011-07-26 17:12:00 +02:00
createInstance [ AnyRef ] ( customRouterClassName , emptyParams , emptyArguments ) . fold (
e ⇒ throw new ConfigurationException (
2011-05-18 17:25:30 +02:00
"Config option [" + addressPath + ".router] needs to be one of " +
2011-10-07 15:42:55 +02:00
"[\"direct\", \"round-robin\", \"random\", \"scatter-gather\", \"least-cpu\", \"least-ram\", \"least-messages\" or the fully qualified name of Router class]" , e ) ,
2011-07-26 17:12:00 +02:00
CustomRouter ( _ ) )
}
2011-09-28 14:50:09 +02:00
// --------------------------------
2011-09-28 19:42:12 +02:00
// akka.actor.deployment.<address>.nr-of-instances
2011-09-28 14:50:09 +02:00
// --------------------------------
val nrOfInstances = {
2011-09-28 19:42:12 +02:00
if ( router == Direct ) NrOfInstances ( 1 )
2011-09-28 14:50:09 +02:00
else {
2011-09-28 19:42:12 +02:00
addressConfig . getAny ( "nr-of-instances" , "1" ) match {
case "auto" ⇒ AutoNrOfInstances
case "1" ⇒ NrOfInstances ( 1 )
case "0" ⇒ ZeroNrOfInstances
2011-09-28 14:50:09 +02:00
case nrOfReplicas : String ⇒
try {
2011-09-28 19:42:12 +02:00
new NrOfInstances ( nrOfReplicas . toInt )
2011-09-28 14:50:09 +02:00
} catch {
case e : Exception ⇒
throw new ConfigurationException (
"Config option [" + addressPath +
2011-09-28 19:42:12 +02:00
".nr-of-instances] needs to be either [\"auto\"] or [1-N] - was [" +
2011-09-28 14:50:09 +02:00
nrOfReplicas + "]" )
}
}
}
}
2011-08-31 15:07:18 +02:00
// --------------------------------
2011-10-07 15:42:55 +02:00
// akka.actor.deployment.<address>.failure-detector.<detector>
2011-08-31 15:07:18 +02:00
// --------------------------------
2011-09-19 15:21:18 +02:00
val failureDetectorOption : Option [ FailureDetector ] = addressConfig . getSection ( "failure-detector" ) match {
case Some ( failureDetectorConfig ) ⇒
failureDetectorConfig . keys . toList match {
case Nil ⇒ None
case detector : : Nil ⇒
detector match {
2011-10-07 15:42:55 +02:00
case "no-op" ⇒
Some ( NoOpFailureDetector )
2011-09-19 15:21:18 +02:00
case "remove-connection-on-first-failure" ⇒
Some ( RemoveConnectionOnFirstFailureFailureDetector )
case "bannage-period" ⇒
2011-10-07 15:42:55 +02:00
throw new ConfigurationException (
"Configuration for [" + addressPath + ".failure-detector.bannage-period] must have a 'time-to-ban' option defined" )
case "bannage-period.time-to-ban" ⇒
2011-09-19 15:21:18 +02:00
failureDetectorConfig . getSection ( "bannage-period" ) map { section ⇒
2011-10-07 15:42:55 +02:00
val timeToBan = Duration ( section . getInt ( "time-to-ban" , 60 ) , Config . TIME_UNIT )
BannagePeriodFailureDetector ( timeToBan )
2011-09-19 15:21:18 +02:00
}
case "custom" ⇒
failureDetectorConfig . getSection ( "custom" ) map { section ⇒
val implementationClass = section . getString ( "class" ) . getOrElse ( throw new ConfigurationException (
"Configuration for [" + addressPath +
2011-10-07 15:42:55 +02:00
".failure-detector.custom] must have a 'class' element with the fully qualified name of the failure detector class" ) )
2011-09-19 15:21:18 +02:00
CustomFailureDetector ( implementationClass )
}
case _ ⇒ None
}
case detectors ⇒
throw new ConfigurationException (
"Configuration for [" + addressPath +
2011-10-07 15:42:55 +02:00
".failure-detector] can not have multiple sections - found [" + detectors . mkString ( ", " ) + "]" )
2011-09-19 15:21:18 +02:00
}
case None ⇒ None
2011-08-31 15:07:18 +02:00
}
2011-10-07 15:42:55 +02:00
val failureDetector = failureDetectorOption getOrElse { NoOpFailureDetector } // fall back to default failure detector
2011-08-31 15:07:18 +02:00
2011-09-19 15:21:18 +02:00
// --------------------------------
// akka.actor.deployment.<address>.create-as
// --------------------------------
2011-07-26 17:12:00 +02:00
val recipe : Option [ ActorRecipe ] = addressConfig . getSection ( "create-as" ) map { section ⇒
2011-09-19 15:21:18 +02:00
val implementationClass = section . getString ( "class" ) match {
2011-07-26 17:12:00 +02:00
case Some ( impl ) ⇒
2011-09-19 15:21:18 +02:00
getClassFor [ Actor ] ( impl ) . fold ( e ⇒ throw new ConfigurationException (
"Config option [" + addressPath + ".create-as.class] load failed" , e ) , identity )
case None ⇒
throw new ConfigurationException (
"Config option [" + addressPath + ".create-as.class] is missing, need the fully qualified name of the class" )
2011-07-26 17:12:00 +02:00
}
ActorRecipe ( implementationClass )
2011-04-27 00:40:20 +02:00
}
// --------------------------------
2011-09-19 14:41:41 +02:00
// akka.actor.deployment.<address>.remote
2011-04-27 00:40:20 +02:00
// --------------------------------
2011-09-15 10:20:18 +02:00
addressConfig . getSection ( "remote" ) match {
case Some ( remoteConfig ) ⇒ // we have a 'remote' config section
2011-04-27 00:40:20 +02:00
2011-09-15 10:20:18 +02:00
if ( addressConfig . getSection ( "cluster" ) . isDefined ) throw new ConfigurationException (
"Configuration for deployment ID [" + address + "] can not have both 'remote' and 'cluster' sections." )
2011-04-27 00:40:20 +02:00
2011-10-05 18:44:27 +02:00
// --------------------------------
// akka.actor.deployment.<address>.remote.nodes
// --------------------------------
val remoteAddresses = remoteConfig . getList ( "nodes" ) match {
case Nil ⇒ Nil
case nodes ⇒
def raiseRemoteNodeParsingError ( ) = throw new ConfigurationException (
"Config option [" + addressPath +
".remote.nodes] needs to be a list with elements on format \"<hostname>:<port>\", was [" + nodes . mkString ( ", " ) + "]" )
nodes map { node ⇒
val tokenizer = new java . util . StringTokenizer ( node , ":" )
val hostname = tokenizer . nextElement . toString
if ( ( hostname eq null ) || ( hostname == "" ) ) raiseRemoteNodeParsingError ( )
val port = try tokenizer . nextElement . toString . toInt catch {
case e : Exception ⇒ raiseRemoteNodeParsingError ( )
}
if ( port == 0 ) raiseRemoteNodeParsingError ( )
RemoteAddress ( hostname , port )
}
}
2011-05-23 22:35:01 +02:00
2011-10-05 18:44:27 +02:00
Some ( Deploy ( address , recipe , router , nrOfInstances , failureDetector , RemoteScope ( remoteAddresses ) ) )
2011-09-15 10:20:18 +02:00
case None ⇒ // check for 'cluster' config section
2011-04-27 00:40:20 +02:00
// --------------------------------
2011-09-15 10:20:18 +02:00
// akka.actor.deployment.<address>.cluster
2011-04-27 00:40:20 +02:00
// --------------------------------
2011-09-15 10:20:18 +02:00
addressConfig . getSection ( "cluster" ) match {
case None ⇒
2011-10-07 15:42:55 +02:00
Some ( Deploy ( address , recipe , router , nrOfInstances , NoOpFailureDetector , LocalScope ) ) // deploy locally
2011-09-15 10:20:18 +02:00
case Some ( clusterConfig ) ⇒
// --------------------------------
// akka.actor.deployment.<address>.cluster.preferred-nodes
// --------------------------------
val preferredNodes = clusterConfig . getList ( "preferred-nodes" ) match {
case Nil ⇒ Nil
case homes ⇒
def raiseHomeConfigError ( ) = throw new ConfigurationException (
"Config option [" + addressPath +
".cluster.preferred-nodes] needs to be a list with elements on format\n'host:<hostname>', 'ip:<ip address>' or 'node:<node name>', was [" +
homes + "]" )
homes map { home ⇒
if ( ! ( home . startsWith ( "host:" ) || home . startsWith ( "node:" ) || home . startsWith ( "ip:" ) ) ) raiseHomeConfigError ( )
val tokenizer = new java . util . StringTokenizer ( home , ":" )
val protocol = tokenizer . nextElement
val address = tokenizer . nextElement . asInstanceOf [ String ]
protocol match {
//case "host" ⇒ Host(address)
case "node" ⇒ Node ( address )
//case "ip" ⇒ IP(address)
case _ ⇒ raiseHomeConfigError ( )
}
2011-07-08 08:28:13 +02:00
}
2011-04-27 00:40:20 +02:00
}
2011-09-15 10:20:18 +02:00
// --------------------------------
// akka.actor.deployment.<address>.cluster.replication
// --------------------------------
clusterConfig . getSection ( "replication" ) match {
case None ⇒
2011-09-28 14:50:09 +02:00
Some ( Deploy ( address , recipe , router , nrOfInstances , failureDetector , ClusterScope ( preferredNodes , Transient ) ) )
2011-09-15 10:20:18 +02:00
case Some ( replicationConfig ) ⇒
val storage = replicationConfig . getString ( "storage" , "transaction-log" ) match {
case "transaction-log" ⇒ TransactionLog
case "data-grid" ⇒ DataGrid
case unknown ⇒
throw new ConfigurationException ( "Config option [" + addressPath +
".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" +
unknown + "]" )
}
val strategy = replicationConfig . getString ( "strategy" , "write-through" ) match {
case "write-through" ⇒ WriteThrough
case "write-behind" ⇒ WriteBehind
case unknown ⇒
throw new ConfigurationException ( "Config option [" + addressPath +
".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
unknown + "]" )
}
2011-09-28 14:50:09 +02:00
Some ( Deploy ( address , recipe , router , nrOfInstances , failureDetector , ClusterScope ( preferredNodes , Replication ( storage , strategy ) ) ) )
2011-06-07 11:10:29 -07:00
}
}
2011-04-27 00:40:20 +02:00
}
}
}
2011-05-30 10:53:25 +02:00
private [ akka ] def throwDeploymentBoundException ( deployment : Deploy ) : Nothing = {
2011-07-26 17:12:00 +02:00
val e = new DeploymentAlreadyBoundException ( "Address [" + deployment . address + "] already bound to [" + deployment + "]" )
2011-04-27 00:40:20 +02:00
EventHandler . error ( e , this , e . getMessage )
throw e
}
2011-05-30 10:53:25 +02:00
private [ akka ] def thrownNoDeploymentBoundException ( address : String ) : Nothing = {
2011-04-27 00:40:20 +02:00
val e = new NoDeploymentBoundException ( "Address [" + address + "] is not bound to a deployment" )
EventHandler . error ( e , this , e . getMessage )
throw e
}
}
2011-04-29 15:47:56 +02:00
/* *
2011-09-28 14:50:09 +02:00
* Simple local deployer , only for internal use .
2011-05-18 12:25:27 +02:00
*
2011-04-29 15:47:56 +02:00
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2011-07-26 17:12:00 +02:00
object LocalDeployer extends ActorDeployer {
2011-04-29 15:47:56 +02:00
private val deployments = new ConcurrentHashMap [ String , Deploy ]
2011-07-26 17:12:00 +02:00
private [ akka ] def init ( deployments : Seq [ Deploy ] ) {
2011-05-03 21:04:45 +02:00
deployments foreach ( deploy ( _ ) ) // deploy
}
private [ akka ] def shutdown ( ) {
2011-07-26 17:12:00 +02:00
deployments . clear ( ) //TODO do something else/more?
2011-05-03 21:04:45 +02:00
}
private [ akka ] def deploy ( deployment : Deploy ) {
2011-09-28 14:50:09 +02:00
deployments . putIfAbsent ( deployment . address , deployment )
2011-04-29 15:47:56 +02:00
}
2011-05-21 16:14:15 +02:00
private [ akka ] def lookupDeploymentFor ( address : String ) : Option [ Deploy ] = Option ( deployments . get ( address ) )
2011-04-29 15:47:56 +02:00
}
2011-05-18 17:25:30 +02:00
class DeploymentException private [ akka ] ( message : String ) extends AkkaException ( message )
class DeploymentAlreadyBoundException private [ akka ] ( message : String ) extends AkkaException ( message )
class NoDeploymentBoundException private [ akka ] ( message : String ) extends AkkaException ( message )