2011-09-15 10:20:18 +02:00
/* *
* Copyright ( C ) 2009 - 2011 Typesafe Inc . < http : //www.typesafe.com>
*/
2011-09-20 21:44:50 +02:00
package akka.remote
2011-09-15 10:20:18 +02:00
2011-11-10 20:08:00 +01:00
import akka.AkkaException
2011-09-15 10:20:18 +02:00
import akka.actor._
2011-10-07 15:42:55 +02:00
import akka.actor.Actor._
import akka.actor.Status._
2011-10-20 15:11:34 +02:00
import akka.routing._
2011-10-13 17:42:26 +02:00
import akka.dispatch._
2011-09-19 14:43:28 +02:00
import akka.util.duration._
import akka.config.ConfigurationException
2011-10-27 12:23:01 +02:00
import akka.event. { DeathWatch , Logging }
2011-10-23 18:40:00 +02:00
import akka.serialization. { Serialization , Serializer , Compression }
2011-10-20 15:11:34 +02:00
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
2011-09-15 10:20:18 +02:00
import java.net.InetSocketAddress
2011-10-20 15:11:34 +02:00
import java.util.concurrent.ConcurrentHashMap
2011-09-19 14:43:28 +02:00
import com.google.protobuf.ByteString
2011-11-03 14:53:38 +01:00
import java.util.concurrent.atomic.AtomicBoolean
2011-11-14 14:21:53 +01:00
import akka.event.EventStream
2011-09-19 14:43:28 +02:00
2011-09-15 10:20:18 +02:00
/* *
2011-09-19 14:43:28 +02:00
* Remote ActorRefProvider . Starts up actor on remote node and creates a RemoteActorRef representing it .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2011-09-15 10:20:18 +02:00
*/
2011-11-14 14:21:53 +01:00
class RemoteActorRefProvider (
2011-11-14 18:18:08 +01:00
val AkkaConfig : ActorSystem . AkkaConfig ,
2011-11-17 10:54:17 +01:00
val rootPath : ActorPath ,
2011-11-14 14:21:53 +01:00
val eventStream : EventStream ,
val dispatcher : MessageDispatcher ,
val scheduler : Scheduler ) extends ActorRefProvider {
2011-09-15 10:20:18 +02:00
2011-11-16 17:18:36 +01:00
val log = Logging ( eventStream , this )
2011-10-27 12:23:01 +02:00
2011-09-27 10:40:27 +02:00
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
2011-10-12 11:34:35 +02:00
2011-11-17 10:54:17 +01:00
val local = new LocalActorRefProvider ( AkkaConfig , rootPath , eventStream , dispatcher , scheduler )
2011-11-14 14:21:53 +01:00
def deathWatch = local . deathWatch
def guardian = local . guardian
def systemGuardian = local . systemGuardian
2011-11-14 18:18:08 +01:00
def nodename = local . nodename
2011-11-16 17:18:36 +01:00
def tempName = local . tempName
2011-11-14 14:21:53 +01:00
2011-11-16 17:18:36 +01:00
@volatile
var remote : Remote = _
2011-09-27 10:40:27 +02:00
2011-10-18 11:26:35 +02:00
private val actors = new ConcurrentHashMap [ String , AnyRef ]
2011-09-27 10:40:27 +02:00
2011-11-16 17:18:36 +01:00
@volatile
private var remoteDaemonConnectionManager : RemoteConnectionManager = _
def init ( app : ActorSystemImpl ) {
local . init ( app )
remote = new Remote ( app , nodename )
remoteDaemonConnectionManager = new RemoteConnectionManager ( app , remote )
}
2011-09-19 14:43:28 +02:00
2011-10-26 16:12:48 +02:00
private [ akka ] def theOneWhoWalksTheBubblesOfSpaceTime : ActorRef = local . theOneWhoWalksTheBubblesOfSpaceTime
2011-10-28 16:00:06 +02:00
private [ akka ] def terminationFuture = local . terminationFuture
2011-10-26 16:12:48 +02:00
2011-11-04 10:11:07 +01:00
private [ akka ] def deployer : Deployer = local . deployer
2011-11-16 17:18:36 +01:00
def defaultDispatcher = dispatcher
def defaultTimeout = AkkaConfig . ActorTimeout
2011-10-13 17:42:26 +02:00
2011-11-16 17:18:36 +01:00
private [ akka ] def actorOf ( app : ActorSystemImpl , props : Props , supervisor : ActorRef , name : String , systemService : Boolean ) : ActorRef =
actorOf ( app , props , supervisor , supervisor . path / name , systemService )
2011-11-08 11:56:46 +01:00
2011-11-16 17:18:36 +01:00
private [ akka ] def actorOf ( app : ActorSystemImpl , props : Props , supervisor : ActorRef , path : ActorPath , systemService : Boolean ) : ActorRef =
if ( systemService ) local . actorOf ( app , props , supervisor , path , systemService )
2011-10-18 14:21:48 +02:00
else {
2011-11-08 11:56:46 +01:00
val name = path . name
2011-10-18 14:21:48 +02:00
val newFuture = Promise [ ActorRef ] ( 5000 ) ( defaultDispatcher ) // FIXME is this proper timeout?
2011-11-08 11:56:46 +01:00
actors . putIfAbsent ( path . toString , newFuture ) match { // we won the race -- create the actor and resolve the future
2011-10-18 14:21:48 +02:00
case null ⇒
val actor : ActorRef = try {
2011-11-08 16:49:50 +01:00
deployer . lookupDeploymentFor ( path . toString ) match {
2011-11-10 11:50:11 +01:00
case Some ( DeploymentConfig . Deploy ( _ , _ , routerType , nrOfInstances , DeploymentConfig . RemoteScope ( remoteAddresses ) ) ) ⇒
2011-10-18 14:21:48 +02:00
2011-10-20 15:11:34 +02:00
// FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one
// val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match {
// case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector
// case FailureDetectorType.RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector
// case FailureDetectorType.BannagePeriod(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan)
// case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass)
// }
2011-10-18 11:26:35 +02:00
2011-11-10 19:03:18 +01:00
def isReplicaNode : Boolean = remoteAddresses exists { _ == app . address }
2011-10-18 14:21:48 +02:00
2011-11-03 14:53:38 +01:00
//app.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(app.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
2011-10-18 14:21:48 +02:00
if ( isReplicaNode ) {
// we are on one of the replica node for this remote actor
2011-11-16 17:18:36 +01:00
local . actorOf ( app , props , supervisor , name , true ) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
2011-10-18 14:21:48 +02:00
} else {
// we are on the single "reference" node uses the remote actors on the replica nodes
val routerFactory : ( ) ⇒ Router = DeploymentConfig . routerTypeFor ( routerType ) match {
case RouterType . Direct ⇒
if ( remoteAddresses . size != 1 ) throw new ConfigurationException (
"Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
2011-11-08 11:56:46 +01:00
. format ( name , remoteAddresses . mkString ( ", " ) ) )
2011-10-18 14:21:48 +02:00
( ) ⇒ new DirectRouter
case RouterType . Random ⇒
if ( remoteAddresses . size < 1 ) throw new ConfigurationException (
"Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
2011-11-08 11:56:46 +01:00
. format ( name , remoteAddresses . mkString ( ", " ) ) )
2011-10-18 14:21:48 +02:00
( ) ⇒ new RandomRouter
case RouterType . RoundRobin ⇒
if ( remoteAddresses . size < 1 ) throw new ConfigurationException (
"Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
2011-11-08 11:56:46 +01:00
. format ( name , remoteAddresses . mkString ( ", " ) ) )
2011-10-18 14:21:48 +02:00
( ) ⇒ new RoundRobinRouter
case RouterType . ScatterGather ⇒
if ( remoteAddresses . size < 1 ) throw new ConfigurationException (
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
2011-11-08 11:56:46 +01:00
. format ( name , remoteAddresses . mkString ( ", " ) ) )
2011-10-18 14:21:48 +02:00
( ) ⇒ new ScatterGatherFirstCompletedRouter ( ) ( defaultDispatcher , defaultTimeout )
case RouterType . LeastCPU ⇒ sys . error ( "Router LeastCPU not supported yet" )
case RouterType . LeastRAM ⇒ sys . error ( "Router LeastRAM not supported yet" )
case RouterType . LeastMessages ⇒ sys . error ( "Router LeastMessages not supported yet" )
case RouterType . Custom ( implClass ) ⇒ ( ) ⇒ Routing . createCustomRouter ( implClass )
}
2011-11-10 17:39:04 +01:00
val connections = ( Map . empty [ RemoteAddress , ActorRef ] /: remoteAddresses ) { ( conns , a ) ⇒
val remoteAddress = RemoteAddress ( a . hostname , a . port )
2011-11-16 17:18:36 +01:00
conns + ( remoteAddress -> RemoteActorRef ( remote . app . provider , remote . server , remoteAddress , path , None ) )
2011-10-18 14:21:48 +02:00
}
2011-10-20 15:11:34 +02:00
val connectionManager = new RemoteConnectionManager ( app , remote , connections )
2011-10-18 14:21:48 +02:00
2011-11-16 17:18:36 +01:00
connections . keys foreach { useActorOnNode ( app , _ , path . toString , props . creator ) }
2011-10-18 14:21:48 +02:00
2011-11-16 17:18:36 +01:00
actorOf ( app , RoutedProps ( routerFactory = routerFactory , connectionManager = connectionManager ) , supervisor , name )
2011-10-18 11:26:35 +02:00
}
2011-11-16 17:18:36 +01:00
case deploy ⇒ local . actorOf ( app , props , supervisor , name , systemService )
2011-09-27 10:40:27 +02:00
}
2011-10-18 14:21:48 +02:00
} catch {
case e : Exception ⇒
newFuture completeWithException e // so the other threads gets notified of error
throw e
}
2011-10-18 11:26:35 +02:00
2011-10-18 14:21:48 +02:00
// actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later
2011-09-27 10:40:27 +02:00
2011-10-18 14:21:48 +02:00
newFuture completeWithResult actor
2011-11-08 11:56:46 +01:00
actors . replace ( path . toString , newFuture , actor )
2011-10-18 14:21:48 +02:00
actor
case actor : ActorRef ⇒ actor
case future : Future [ _ ] ⇒ future . get . asInstanceOf [ ActorRef ]
2011-09-27 16:52:33 +02:00
}
2011-09-15 10:20:18 +02:00
}
2011-10-13 13:41:44 +02:00
/* *
* Copied from LocalActorRefProvider . . .
*/
2011-10-18 15:39:26 +02:00
// FIXME: implement supervision
2011-11-16 17:18:36 +01:00
def actorOf ( app : ActorSystem , props : RoutedProps , supervisor : ActorRef , name : String ) : ActorRef = {
2011-11-08 11:56:46 +01:00
if ( props . connectionManager . isEmpty ) throw new ConfigurationException ( "RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router" )
new RoutedActorRef ( app , props , supervisor , name )
2011-10-13 13:41:44 +02:00
}
2011-11-08 11:56:46 +01:00
def actorFor ( path : Iterable [ String ] ) : Option [ ActorRef ] = actors . get ( ActorPath . join ( path ) ) match {
case null ⇒ local . actorFor ( path )
2011-10-18 11:26:35 +02:00
case actor : ActorRef ⇒ Some ( actor )
case future : Future [ _ ] ⇒ Some ( future . get . asInstanceOf [ ActorRef ] )
2011-10-07 19:42:10 +02:00
}
2011-09-15 10:20:18 +02:00
2011-11-16 17:18:36 +01:00
// TODO remove me
2011-11-03 14:53:38 +01:00
val optimizeLocal = new AtomicBoolean ( true )
def optimizeLocalScoped_? ( ) = optimizeLocal . get
2011-09-27 11:49:42 +02:00
/* *
* Returns true if the actor was in the provider 's cache and evicted successfully , else false .
*/
2011-11-08 11:56:46 +01:00
private [ akka ] def evict ( path : String ) : Boolean = actors . remove ( path ) ne null
2011-11-03 14:53:38 +01:00
private [ akka ] def serialize ( actor : ActorRef ) : SerializedActorRef = actor match {
2011-11-08 11:56:46 +01:00
case r : RemoteActorRef ⇒ new SerializedActorRef ( r . remoteAddress , actor . path . toString )
2011-10-28 23:11:35 +02:00
case other ⇒ local . serialize ( actor )
2011-10-13 17:42:26 +02:00
}
2011-11-03 14:53:38 +01:00
private [ akka ] def deserialize ( actor : SerializedActorRef ) : Option [ ActorRef ] = {
2011-11-10 17:39:04 +01:00
val remoteAddress = RemoteAddress ( actor . hostname , actor . port )
2011-11-17 10:54:17 +01:00
if ( optimizeLocalScoped_? && remoteAddress == rootPath . remoteAddress ) {
2011-11-08 11:56:46 +01:00
local . actorFor ( ActorPath . split ( actor . path ) )
2011-11-03 14:53:38 +01:00
} else {
2011-11-17 10:54:17 +01:00
log . debug ( "{}: Creating RemoteActorRef with address [{}] connected to [{}]" , rootPath . remoteAddress , actor . path , remoteAddress )
Some ( RemoteActorRef ( remote . app . provider , remote . server , remoteAddress , rootPath / ActorPath . split ( actor . path ) , None ) ) //Should it be None here
2011-11-03 14:53:38 +01:00
}
}
2011-10-28 23:11:35 +02:00
2011-09-19 14:43:28 +02:00
/* *
* Using ( checking out ) actor on a specific node .
*/
2011-11-16 17:18:36 +01:00
def useActorOnNode ( app : ActorSystem , remoteAddress : RemoteAddress , actorPath : String , actorFactory : ( ) ⇒ Actor ) {
2011-11-17 10:54:17 +01:00
log . debug ( "[{}] Instantiating Actor [{}] on node [{}]" , rootPath , actorPath , remoteAddress )
2011-09-19 14:43:28 +02:00
val actorFactoryBytes =
2011-10-12 09:10:05 +02:00
app . serialization . serialize ( actorFactory ) match {
2011-10-28 23:11:35 +02:00
case Left ( error ) ⇒ throw error
case Right ( bytes ) ⇒ if ( remote . shouldCompressData ) LZF . compress ( bytes ) else bytes
2011-09-19 14:43:28 +02:00
}
2011-10-19 12:25:16 +02:00
val command = RemoteSystemDaemonMessageProtocol . newBuilder
2011-09-19 14:43:28 +02:00
. setMessageType ( USE )
2011-11-08 14:30:33 +01:00
. setActorPath ( actorPath )
2011-09-19 14:43:28 +02:00
. setPayload ( ByteString . copyFrom ( actorFactoryBytes ) )
. build ( )
2011-11-08 11:56:46 +01:00
val connectionFactory = ( ) ⇒ deserialize ( new SerializedActorRef ( remoteAddress , remote . remoteDaemon . path . toString ) ) . get
2011-09-19 14:43:28 +02:00
// try to get the connection for the remote address, if not already there then create it
2011-10-07 15:42:55 +02:00
val connection = remoteDaemonConnectionManager . putIfAbsent ( remoteAddress , connectionFactory )
2011-09-19 14:43:28 +02:00
2011-09-22 03:36:59 +02:00
sendCommandToRemoteNode ( connection , command , withACK = true ) // ensure we get an ACK on the USE command
2011-09-15 10:20:18 +02:00
}
2011-10-20 15:11:34 +02:00
private def sendCommandToRemoteNode ( connection : ActorRef , command : RemoteSystemDaemonMessageProtocol , withACK : Boolean ) {
2011-09-22 03:36:59 +02:00
if ( withACK ) {
2011-09-15 10:20:18 +02:00
try {
2011-10-28 23:11:35 +02:00
val f = connection ? ( command , remote . remoteSystemDaemonAckTimeout )
( try f . await . value catch { case _ : FutureTimeoutException ⇒ None } ) match {
case Some ( Right ( receiver ) ) ⇒
2011-10-27 12:23:01 +02:00
log . debug ( "Remote system command sent to [{}] successfully received" , receiver )
2011-09-15 10:20:18 +02:00
2011-10-28 23:11:35 +02:00
case Some ( Left ( cause ) ) ⇒
2011-10-27 12:23:01 +02:00
log . error ( cause , cause . toString )
2011-09-15 10:20:18 +02:00
throw cause
case None ⇒
2011-10-19 12:25:16 +02:00
val error = new RemoteException ( "Remote system command to [%s] timed out" . format ( connection . address ) )
2011-10-27 12:23:01 +02:00
log . error ( error , error . toString )
2011-09-15 10:20:18 +02:00
throw error
}
} catch {
case e : Exception ⇒
2011-10-27 12:23:01 +02:00
log . error ( e , "Could not send remote system command to [{}] due to: {}" , connection . address , e . toString )
2011-09-15 10:20:18 +02:00
throw e
}
2011-09-22 03:36:59 +02:00
} else {
connection ! command
2011-09-15 10:20:18 +02:00
}
}
2011-10-17 14:32:31 +02:00
private [ akka ] def createDeathWatch ( ) : DeathWatch = local . createDeathWatch ( ) //FIXME Implement Remote DeathWatch
2011-10-21 15:51:38 +02:00
private [ akka ] def ask ( message : Any , recipient : ActorRef , within : Timeout ) : Future [ Any ] = local . ask ( message , recipient , within )
2011-11-12 10:57:28 +01:00
2011-11-13 20:38:14 +01:00
private [ akka ] def tempPath = local . tempPath
2011-09-15 10:20:18 +02:00
}
2011-10-13 17:42:26 +02:00
/* *
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node .
* This reference is network - aware ( remembers its origin ) and immutable .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
private [ akka ] case class RemoteActorRef private [ akka ] (
2011-11-16 17:18:36 +01:00
provider : ActorRefProvider ,
2011-10-22 16:06:20 +02:00
remote : RemoteSupport ,
2011-11-10 17:39:04 +01:00
remoteAddress : RemoteAddress ,
2011-11-08 11:56:46 +01:00
path : ActorPath ,
2011-10-13 17:42:26 +02:00
loader : Option [ ClassLoader ] )
extends ActorRef with ScalaActorRef {
2011-11-08 11:56:46 +01:00
2011-10-13 17:42:26 +02:00
@volatile
private var running : Boolean = true
2011-11-08 11:56:46 +01:00
def name = path . name
2011-11-10 18:06:16 +01:00
def address = remoteAddress + path . toString
2011-11-08 11:56:46 +01:00
2011-10-13 17:42:26 +02:00
def isShutdown : Boolean = ! running
2011-10-18 15:39:26 +02:00
protected [ akka ] def sendSystemMessage ( message : SystemMessage ) : Unit = unsupported
2011-11-11 12:59:19 +01:00
def tell ( message : Any , sender : ActorRef ) : Unit = remote . send ( message , Option ( sender ) , remoteAddress , this , loader )
2011-10-13 17:42:26 +02:00
2011-11-16 17:18:36 +01:00
def ? ( message : Any ) ( implicit timeout : Timeout ) : Future [ Any ] = provider . ask ( message , this , timeout )
2011-10-13 17:42:26 +02:00
2011-11-03 14:53:38 +01:00
def suspend ( ) : Unit = ( )
2011-10-13 17:42:26 +02:00
2011-11-03 14:53:38 +01:00
def resume ( ) : Unit = ( )
2011-10-13 17:42:26 +02:00
def stop ( ) { //FIXME send the cause as well!
synchronized {
if ( running ) {
running = false
2011-11-03 18:33:57 +01:00
remote . send ( new Terminate ( ) , None , remoteAddress , this , loader )
2011-10-13 17:42:26 +02:00
}
}
}
@throws ( classOf [ java . io . ObjectStreamException ] )
2011-11-16 17:18:36 +01:00
private def writeReplace ( ) : AnyRef = provider . serialize ( this )
2011-10-13 17:42:26 +02:00
2011-11-03 14:53:38 +01:00
def startsMonitoring ( actorRef : ActorRef ) : ActorRef = unsupported //FIXME Implement
2011-10-13 17:42:26 +02:00
2011-11-03 14:53:38 +01:00
def stopsMonitoring ( actorRef : ActorRef ) : ActorRef = unsupported //FIXME Implement
2011-10-13 17:42:26 +02:00
2011-11-03 14:53:38 +01:00
protected [ akka ] def restart ( cause : Throwable ) : Unit = ( )
2011-10-13 17:42:26 +02:00
private def unsupported = throw new UnsupportedOperationException ( "Not supported for RemoteActorRef" )
}