2011-09-15 10:20:18 +02:00
/* *
* Copyright ( C ) 2009 - 2011 Typesafe Inc . < http : //www.typesafe.com>
*/
2011-09-20 21:44:50 +02:00
package akka.remote
2011-09-15 10:20:18 +02:00
import akka.actor._
2011-10-05 18:44:27 +02:00
import akka.routing._
2011-10-07 15:42:55 +02:00
import akka.actor.Actor._
import akka.actor.Status._
2011-10-13 17:42:26 +02:00
import akka.dispatch._
2011-09-19 14:43:28 +02:00
import akka.util.duration._
import akka.config.ConfigurationException
2011-09-15 10:20:18 +02:00
import akka.AkkaException
import RemoteProtocol._
2011-09-19 14:43:28 +02:00
import RemoteDaemonMessageType._
import akka.serialization. { Serialization , Serializer , ActorSerialization , Compression }
import Compression.LZF
2011-09-15 10:20:18 +02:00
import java.net.InetSocketAddress
2011-09-19 14:43:28 +02:00
import com.google.protobuf.ByteString
2011-10-12 09:10:05 +02:00
import akka.AkkaApplication
2011-10-17 14:32:31 +02:00
import akka.event. { DeathWatch , EventHandler }
2011-09-19 14:43:28 +02:00
2011-09-15 10:20:18 +02:00
/* *
2011-09-19 14:43:28 +02:00
* Remote ActorRefProvider . Starts up actor on remote node and creates a RemoteActorRef representing it .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2011-09-15 10:20:18 +02:00
*/
2011-10-13 17:42:26 +02:00
class RemoteActorRefProvider ( val app : AkkaApplication ) extends ActorRefProvider {
2011-09-15 10:20:18 +02:00
2011-09-27 10:40:27 +02:00
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
2011-10-12 11:34:35 +02:00
2011-10-13 17:42:26 +02:00
val local = new LocalActorRefProvider ( app )
val remote = new Remote ( app )
2011-09-27 10:40:27 +02:00
2011-10-18 11:26:35 +02:00
private val actors = new ConcurrentHashMap [ String , AnyRef ]
2011-09-27 10:40:27 +02:00
2011-10-13 13:41:44 +02:00
private val remoteDaemonConnectionManager = new RemoteConnectionManager (
2011-10-13 14:23:44 +02:00
app ,
2011-10-13 13:41:44 +02:00
remote = remote ,
failureDetector = new BannagePeriodFailureDetector ( 60 seconds ) ) // FIXME make timeout configurable
2011-09-19 14:43:28 +02:00
2011-10-13 17:42:26 +02:00
def defaultDispatcher = app . dispatcher
def defaultTimeout = app . AkkaConfig . ActorTimeout
def actorOf ( props : Props , address : String ) : ActorRef = actorOf ( props , address , false )
2011-10-18 14:21:48 +02:00
def actorOf ( props : Props , address : String , systemService : Boolean ) : ActorRef =
if ( systemService ) local . actorOf ( props , address , systemService )
else {
val newFuture = Promise [ ActorRef ] ( 5000 ) ( defaultDispatcher ) // FIXME is this proper timeout?
actors . putIfAbsent ( address , newFuture ) match { // we won the race -- create the actor and resolve the future
case null ⇒
val actor : ActorRef = try {
app . deployer . lookupDeploymentFor ( address ) match {
case Some ( DeploymentConfig . Deploy ( _ , _ , routerType , nrOfInstances , failureDetectorType , DeploymentConfig . RemoteScope ( remoteAddresses ) ) ) ⇒
val failureDetector = DeploymentConfig . failureDetectorTypeFor ( failureDetectorType ) match {
case FailureDetectorType . NoOp ⇒ new NoOpFailureDetector
case FailureDetectorType . RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector
case FailureDetectorType . BannagePeriod ( timeToBan ) ⇒ new BannagePeriodFailureDetector ( timeToBan )
case FailureDetectorType . Custom ( implClass ) ⇒ FailureDetector . createCustomFailureDetector ( implClass )
2011-10-18 11:26:35 +02:00
}
2011-10-18 14:21:48 +02:00
val thisHostname = remote . address . getHostName
val thisPort = remote . address . getPort
def isReplicaNode : Boolean = remoteAddresses exists { some ⇒ some . hostname == thisHostname && some . port == thisPort }
if ( isReplicaNode ) {
// we are on one of the replica node for this remote actor
new LocalActorRef ( app , props , address , false )
} else {
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerFactory : ( ) ⇒ Router = DeploymentConfig . routerTypeFor ( routerType ) match {
case RouterType . Direct ⇒
if ( remoteAddresses . size != 1 ) throw new ConfigurationException (
"Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
. format ( address , remoteAddresses . mkString ( ", " ) ) )
( ) ⇒ new DirectRouter
case RouterType . Random ⇒
if ( remoteAddresses . size < 1 ) throw new ConfigurationException (
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
. format ( address , remoteAddresses . mkString ( ", " ) ) )
( ) ⇒ new RandomRouter
case RouterType . RoundRobin ⇒
if ( remoteAddresses . size < 1 ) throw new ConfigurationException (
"Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
. format ( address , remoteAddresses . mkString ( ", " ) ) )
( ) ⇒ new RoundRobinRouter
case RouterType . ScatterGather ⇒
if ( remoteAddresses . size < 1 ) throw new ConfigurationException (
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
. format ( address , remoteAddresses . mkString ( ", " ) ) )
( ) ⇒ new ScatterGatherFirstCompletedRouter ( ) ( defaultDispatcher , defaultTimeout )
case RouterType . LeastCPU ⇒ sys . error ( "Router LeastCPU not supported yet" )
case RouterType . LeastRAM ⇒ sys . error ( "Router LeastRAM not supported yet" )
case RouterType . LeastMessages ⇒ sys . error ( "Router LeastMessages not supported yet" )
case RouterType . Custom ( implClass ) ⇒ ( ) ⇒ Routing . createCustomRouter ( implClass )
}
val connections = ( Map . empty [ InetSocketAddress , ActorRef ] /: remoteAddresses ) { ( conns , a ) ⇒
val inetAddr = new InetSocketAddress ( a . hostname , a . port )
conns + ( inetAddr -> RemoteActorRef ( remote . server , inetAddr , address , None ) )
}
val connectionManager = new RemoteConnectionManager ( app , remote , connections , failureDetector )
connections . keys foreach { useActorOnNode ( _ , address , props . creator ) }
actorOf ( RoutedProps ( routerFactory = routerFactory , connectionManager = connectionManager ) , address )
2011-10-18 11:26:35 +02:00
}
2011-10-18 14:21:48 +02:00
case deploy ⇒ local . actorOf ( props , address , systemService )
}
} catch {
case e : Exception ⇒
newFuture completeWithException e // so the other threads gets notified of error
throw e
}
2011-10-18 11:26:35 +02:00
2011-10-18 14:21:48 +02:00
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
2011-09-27 10:40:27 +02:00
2011-10-18 14:21:48 +02:00
newFuture completeWithResult actor
actors . replace ( address , newFuture , actor )
actor
case actor : ActorRef ⇒ actor
case future : Future [ _ ] ⇒ future . get . asInstanceOf [ ActorRef ]
}
2011-09-15 10:20:18 +02:00
}
2011-10-13 13:41:44 +02:00
/* *
* Copied from LocalActorRefProvider . . .
*/
2011-10-13 17:42:26 +02:00
def actorOf ( props : RoutedProps , address : String ) : ActorRef = {
2011-10-18 14:00:46 +02:00
if ( props . connectionManager . isEmpty ) throw new ConfigurationException ( "RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router" )
2011-10-13 17:42:26 +02:00
new RoutedActorRef ( props , address )
2011-10-13 13:41:44 +02:00
}
2011-10-07 19:42:10 +02:00
def actorFor ( address : String ) : Option [ ActorRef ] = actors . get ( address ) match {
2011-10-18 11:26:35 +02:00
case null ⇒ None
case actor : ActorRef ⇒ Some ( actor )
case future : Future [ _ ] ⇒ Some ( future . get . asInstanceOf [ ActorRef ] )
2011-10-07 19:42:10 +02:00
}
2011-09-15 10:20:18 +02:00
2011-09-27 11:49:42 +02:00
/* *
* Returns true if the actor was in the provider 's cache and evicted successfully , else false .
*/
private [ akka ] def evict ( address : String ) : Boolean = actors . remove ( address ) ne null
2011-10-13 17:42:26 +02:00
private [ akka ] def deserialize ( actor : SerializedActorRef ) : Option [ ActorRef ] = {
local . actorFor ( actor . address ) orElse {
Some ( RemoteActorRef ( remote . server , new InetSocketAddress ( actor . hostname , actor . port ) , actor . address , None ) )
}
}
2011-09-19 14:43:28 +02:00
/* *
* Using ( checking out ) actor on a specific node .
*/
def useActorOnNode ( remoteAddress : InetSocketAddress , actorAddress : String , actorFactory : ( ) ⇒ Actor ) {
2011-10-13 13:16:41 +02:00
app . eventHandler . debug ( this , "Instantiating Actor [%s] on node [%s]" . format ( actorAddress , remoteAddress ) )
2011-09-19 14:43:28 +02:00
val actorFactoryBytes =
2011-10-12 09:10:05 +02:00
app . serialization . serialize ( actorFactory ) match {
2011-09-19 14:43:28 +02:00
case Left ( error ) ⇒ throw error
case Right ( bytes ) ⇒
2011-10-12 09:10:05 +02:00
if ( remote . shouldCompressData ) LZF . compress ( bytes )
2011-09-19 14:43:28 +02:00
else bytes
}
val command = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( USE )
. setActorAddress ( actorAddress )
. setPayload ( ByteString . copyFrom ( actorFactoryBytes ) )
. build ( )
2011-09-22 03:36:59 +02:00
val connectionFactory =
2011-10-12 09:10:05 +02:00
( ) ⇒ remote . server . actorFor (
remote . remoteDaemonServiceName , remoteAddress . getHostName , remoteAddress . getPort )
2011-09-19 14:43:28 +02:00
// try to get the connection for the remote address, if not already there then create it
2011-10-07 15:42:55 +02:00
val connection = remoteDaemonConnectionManager . putIfAbsent ( remoteAddress , connectionFactory )
2011-09-19 14:43:28 +02:00
2011-09-22 03:36:59 +02:00
sendCommandToRemoteNode ( connection , command , withACK = true ) // ensure we get an ACK on the USE command
2011-09-15 10:20:18 +02:00
}
2011-09-19 14:43:28 +02:00
private def sendCommandToRemoteNode (
2011-09-15 10:20:18 +02:00
connection : ActorRef ,
command : RemoteDaemonMessageProtocol ,
2011-09-22 03:36:59 +02:00
withACK : Boolean ) {
2011-09-15 10:20:18 +02:00
2011-09-22 03:36:59 +02:00
if ( withACK ) {
2011-09-15 10:20:18 +02:00
try {
2011-10-12 09:10:05 +02:00
( connection ? ( command , remote . remoteDaemonAckTimeout ) ) . as [ Status ] match {
2011-09-19 14:43:28 +02:00
case Some ( Success ( receiver ) ) ⇒
2011-10-13 13:16:41 +02:00
app . eventHandler . debug ( this , "Remote command sent to [%s] successfully received" . format ( receiver ) )
2011-09-15 10:20:18 +02:00
case Some ( Failure ( cause ) ) ⇒
2011-10-13 13:16:41 +02:00
app . eventHandler . error ( cause , this , cause . toString )
2011-09-15 10:20:18 +02:00
throw cause
case None ⇒
val error = new RemoteException ( "Remote command to [%s] timed out" . format ( connection . address ) )
2011-10-13 13:16:41 +02:00
app . eventHandler . error ( error , this , error . toString )
2011-09-15 10:20:18 +02:00
throw error
}
} catch {
case e : Exception ⇒
2011-10-13 13:16:41 +02:00
app . eventHandler . error ( e , this , "Could not send remote command to [%s] due to: %s" . format ( connection . address , e . toString ) )
2011-09-15 10:20:18 +02:00
throw e
}
2011-09-22 03:36:59 +02:00
} else {
connection ! command
2011-09-15 10:20:18 +02:00
}
}
2011-10-17 14:32:31 +02:00
private [ akka ] def createDeathWatch ( ) : DeathWatch = local . createDeathWatch ( ) //FIXME Implement Remote DeathWatch
2011-09-15 10:20:18 +02:00
}
2011-10-13 17:42:26 +02:00
/* *
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node .
* This reference is network - aware ( remembers its origin ) and immutable .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
private [ akka ] case class RemoteActorRef private [ akka ] (
val remote : RemoteSupport ,
val remoteAddress : InetSocketAddress ,
val address : String ,
loader : Option [ ClassLoader ] )
extends ActorRef with ScalaActorRef {
2011-10-18 16:59:57 +02:00
private [ akka ] val uuid : Uuid = newUuid
2011-10-13 17:42:26 +02:00
@volatile
private var running : Boolean = true
def isShutdown : Boolean = ! running
def postMessageToMailbox ( message : Any , channel : UntypedChannel ) {
val chSender = if ( channel . isInstanceOf [ ActorRef ] ) Some ( channel . asInstanceOf [ ActorRef ] ) else None
remote . send [ Any ] ( message , chSender , None , remoteAddress , true , this , loader )
}
def postMessageToMailboxAndCreateFutureResultWithTimeout (
message : Any ,
timeout : Timeout ,
channel : UntypedChannel ) : Future [ Any ] = {
val chSender = if ( channel . isInstanceOf [ ActorRef ] ) Some ( channel . asInstanceOf [ ActorRef ] ) else None
val chFuture = if ( channel . isInstanceOf [ Promise [ _ ] ] ) Some ( channel . asInstanceOf [ Promise [ Any ] ] ) else None
val future = remote . send [ Any ] ( message , chSender , chFuture , remoteAddress , false , this , loader )
if ( future . isDefined ) ActorPromise ( future . get ) ( timeout )
else throw new IllegalActorStateException ( "Expected a future from remote call to actor " + toString )
}
def suspend ( ) : Unit = unsupported
def resume ( ) : Unit = unsupported
def stop ( ) { //FIXME send the cause as well!
synchronized {
if ( running ) {
running = false
2011-10-17 19:31:59 +02:00
postMessageToMailbox ( Terminate , None )
2011-10-13 17:42:26 +02:00
}
}
}
@throws ( classOf [ java . io . ObjectStreamException ] )
private def writeReplace ( ) : AnyRef = {
SerializedActorRef ( uuid , address , remoteAddress . getAddress . getHostAddress , remoteAddress . getPort )
}
2011-10-14 15:09:46 +02:00
def startsMonitoring ( actorRef : ActorRef ) : ActorRef = unsupported
2011-10-13 17:42:26 +02:00
2011-10-14 15:09:46 +02:00
def stopsMonitoring ( actorRef : ActorRef ) : ActorRef = unsupported
2011-10-13 17:42:26 +02:00
protected [ akka ] def restart ( cause : Throwable ) : Unit = unsupported
private def unsupported = throw new UnsupportedOperationException ( "Not supported for RemoteActorRef" )
}