2011-09-15 10:20:18 +02:00
/* *
* Copyright ( C ) 2009 - 2011 Typesafe Inc . < http : //www.typesafe.com>
*/
2011-09-20 21:44:50 +02:00
package akka.remote
2011-09-15 10:20:18 +02:00
2011-11-10 20:08:00 +01:00
import akka.actor.ActorSystem
2011-09-15 10:20:18 +02:00
import akka.actor._
2011-10-27 12:23:01 +02:00
import akka.event.Logging
2011-09-30 14:52:07 +02:00
import akka.actor.Status._
2011-09-15 10:20:18 +02:00
import akka.util._
2011-09-30 14:52:07 +02:00
import akka.util.duration._
import akka.util.Helpers._
import akka.actor.DeploymentConfig._
2011-10-19 12:25:16 +02:00
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
2011-09-15 10:20:18 +02:00
import java.net.InetSocketAddress
import com.eaio.uuid.UUID
2011-11-03 14:53:38 +01:00
import akka.serialization. { JavaSerializer , Serialization , Serializer , Compression }
2011-11-09 12:41:37 +01:00
import akka.dispatch. { Terminate , Dispatchers , Future , PinnedDispatcher }
2011-11-16 17:18:36 +01:00
import java.util.concurrent.atomic.AtomicLong
2011-09-15 10:20:18 +02:00
/* *
2011-10-19 12:25:16 +02:00
* Remote module - contains remote client and server config , remote server instance , remote daemon , remote dispatchers etc .
*
2011-09-15 10:20:18 +02:00
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2011-11-16 17:18:36 +01:00
class Remote ( val app : ActorSystemImpl , val nodename : String ) {
2011-10-12 09:10:05 +02:00
2011-10-27 12:23:01 +02:00
val log = Logging ( app , this )
2011-10-19 12:25:16 +02:00
import app._
2011-11-17 11:51:14 +01:00
val AC = settings
2011-11-16 17:18:36 +01:00
import AC._
2011-10-12 09:10:05 +02:00
2011-11-17 11:51:14 +01:00
// TODO move to settings?
2011-09-19 15:21:18 +02:00
val shouldCompressData = config . getBool ( "akka.remote.use-compression" , false )
2011-10-19 12:25:16 +02:00
val remoteSystemDaemonAckTimeout = Duration ( config . getInt ( "akka.remote.remote-daemon-ack-timeout" , 30 ) , DefaultTimeUnit ) . toMillis . toInt
2011-09-15 10:20:18 +02:00
2011-11-10 11:50:11 +01:00
val failureDetector = new AccrualFailureDetector ( app )
2011-10-19 12:25:16 +02:00
2011-10-20 15:40:05 +02:00
// val gossiper = new Gossiper(this)
2011-10-19 12:25:16 +02:00
val remoteDaemonServiceName = "akka-system-remote-daemon" . intern
2011-09-15 10:20:18 +02:00
// FIXME configure computeGridDispatcher to what?
2011-10-19 12:25:16 +02:00
val computeGridDispatcher = dispatcherFactory . newDispatcher ( "akka:compute-grid" ) . build
2011-09-15 10:20:18 +02:00
2011-11-16 17:18:36 +01:00
// FIXME it is probably better to create another supervisor for handling the children created by handle_*
2011-10-18 17:56:23 +02:00
private [ remote ] lazy val remoteDaemonSupervisor = app . actorOf ( Props (
2011-11-08 11:56:46 +01:00
OneForOneStrategy ( List ( classOf [ Exception ] ) , None , None ) ) , "akka-system-remote-supervisor" ) // is infinite restart what we want?
2011-09-30 14:52:07 +02:00
private [ remote ] lazy val remoteDaemon =
2011-11-16 17:18:36 +01:00
app . provider . actorOf ( app ,
2011-10-28 23:11:35 +02:00
Props ( new RemoteSystemDaemon ( this ) ) . withDispatcher ( dispatcherFactory . newPinnedDispatcher ( remoteDaemonServiceName ) ) ,
2011-10-21 15:11:43 +02:00
remoteDaemonSupervisor ,
2011-10-18 15:39:26 +02:00
remoteDaemonServiceName ,
2011-09-30 14:52:07 +02:00
systemService = true )
2011-09-15 10:20:18 +02:00
2011-10-18 17:56:23 +02:00
private [ remote ] lazy val remoteClientLifeCycleHandler = app . actorOf ( Props ( new Actor {
2011-09-15 10:20:18 +02:00
def receive = {
2011-11-10 12:23:39 +01:00
case RemoteClientError ( cause , remote , address ) ⇒ remote . shutdownClientConnection ( address )
case RemoteClientDisconnected ( remote , address ) ⇒ remote . shutdownClientConnection ( address )
2011-09-15 10:20:18 +02:00
case _ ⇒ //ignore other
}
2011-10-19 12:25:16 +02:00
} ) , "akka.remote.RemoteClientLifeCycleListener" )
2011-10-12 11:34:35 +02:00
2011-10-12 09:10:05 +02:00
lazy val eventStream = new NetworkEventStream ( app )
2011-09-15 10:20:18 +02:00
lazy val server : RemoteSupport = {
2011-10-12 09:10:05 +02:00
val remote = new akka . remote . netty . NettyRemoteSupport ( app )
2011-11-09 15:04:57 +01:00
remote . start ( ) //TODO FIXME Any application loader here?
2011-10-22 16:06:20 +02:00
2011-11-10 20:48:50 +01:00
app . eventStream . subscribe ( eventStream . sender , classOf [ RemoteLifeCycleEvent ] )
app . eventStream . subscribe ( remoteClientLifeCycleHandler , classOf [ RemoteLifeCycleEvent ] )
2011-10-22 16:06:20 +02:00
2011-10-13 14:23:44 +02:00
// TODO actually register this provider in app in remote mode
2011-10-19 12:25:16 +02:00
//provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider)
2011-09-15 10:20:18 +02:00
remote
}
2011-11-03 14:53:38 +01:00
def start ( ) : Unit = {
2011-11-17 10:54:17 +01:00
val serverAddress = server . app . rootPath . remoteAddress //Force init of server
2011-11-03 14:53:38 +01:00
val daemonAddress = remoteDaemon . address //Force init of daemon
2011-11-09 14:56:05 +01:00
log . info ( "Starting remote server on [{}] and starting remoteDaemon with address [{}]" , serverAddress , daemonAddress )
2011-09-22 03:36:59 +02:00
}
2011-09-15 10:20:18 +02:00
}
/* *
2011-10-19 12:25:16 +02:00
* Internal system "daemon" actor for remote internal communication .
2011-09-15 10:20:18 +02:00
*
2011-10-19 12:25:16 +02:00
* It acts as the brain of the remote that responds to system remote events ( messages ) and undertakes action .
2011-09-15 10:20:18 +02:00
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2011-10-19 12:25:16 +02:00
class RemoteSystemDaemon ( remote : Remote ) extends Actor {
2011-10-12 11:34:35 +02:00
2011-10-12 09:10:05 +02:00
import remote._
2011-10-12 11:34:35 +02:00
2011-09-15 10:20:18 +02:00
override def preRestart ( reason : Throwable , msg : Option [ Any ] ) {
2011-10-27 12:23:01 +02:00
log . debug ( "RemoteSystemDaemon failed due to [{}] - restarting..." , reason )
2011-09-15 10:20:18 +02:00
}
2011-10-12 09:10:05 +02:00
def receive : Actor . Receive = {
2011-10-19 12:25:16 +02:00
case message : RemoteSystemDaemonMessageProtocol ⇒
2011-11-09 14:56:05 +01:00
log . debug ( "Received command [\n{}] to RemoteSystemDaemon on [{}]" , message . getMessageType , nodename )
2011-09-15 10:20:18 +02:00
message . getMessageType match {
case USE ⇒ handleUse ( message )
case RELEASE ⇒ handleRelease ( message )
// case STOP ⇒ cluster.shutdown()
// case DISCONNECT ⇒ cluster.disconnect()
// case RECONNECT ⇒ cluster.reconnect()
// case RESIGN ⇒ cluster.resign()
// case FAIL_OVER_CONNECTIONS ⇒ handleFailover(message)
2011-10-19 12:25:16 +02:00
case GOSSIP ⇒ handleGossip ( message )
2011-09-15 10:20:18 +02:00
case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit ( message )
case FUNCTION_FUN0_ANY ⇒ handle_fun0_any ( message )
case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit ( message )
case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any ( message )
//TODO: should we not deal with unrecognized message types?
}
2011-10-27 12:23:01 +02:00
case unknown ⇒ log . warning ( "Unknown message to RemoteSystemDaemon [{}]" , unknown )
2011-09-15 10:20:18 +02:00
}
2011-10-19 12:25:16 +02:00
def handleUse ( message : RemoteSystemDaemonMessageProtocol ) {
2011-09-15 10:20:18 +02:00
try {
2011-11-08 14:30:33 +01:00
if ( message . hasActorPath ) {
2011-09-19 14:43:28 +02:00
val actorFactoryBytes =
2011-10-28 23:11:35 +02:00
if ( shouldCompressData ) LZF . uncompress ( message . getPayload . toByteArray ) else message . getPayload . toByteArray
2011-09-19 14:43:28 +02:00
val actorFactory =
2011-10-27 12:23:01 +02:00
app . serialization . deserialize ( actorFactoryBytes , classOf [ ( ) ⇒ Actor ] , None ) match {
2011-09-15 10:20:18 +02:00
case Left ( error ) ⇒ throw error
2011-09-19 14:43:28 +02:00
case Right ( instance ) ⇒ instance . asInstanceOf [ ( ) ⇒ Actor ]
2011-09-15 10:20:18 +02:00
}
2011-11-08 14:30:33 +01:00
val actorPath = ActorPath ( remote . app , message . getActorPath )
2011-11-16 17:18:36 +01:00
val parent = app . actorFor ( actorPath . parent )
2011-11-08 11:56:46 +01:00
if ( parent . isDefined ) {
2011-11-16 17:18:36 +01:00
app . provider . actorOf ( app , Props ( creator = actorFactory ) , parent . get , actorPath . name )
2011-11-08 11:56:46 +01:00
} else {
log . error ( "Parent actor does not exist, ignoring remote system daemon command [{}]" , message )
}
2011-09-15 10:20:18 +02:00
} else {
2011-10-27 12:23:01 +02:00
log . error ( "Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [{}]" , message )
2011-09-15 10:20:18 +02:00
}
2011-11-10 19:03:18 +01:00
sender ! Success ( app . address )
2011-09-15 10:20:18 +02:00
} catch {
2011-10-28 23:11:35 +02:00
case error : Throwable ⇒ //FIXME doesn't seem sensible
2011-10-22 16:06:20 +02:00
sender ! Failure ( error )
2011-09-15 10:20:18 +02:00
throw error
}
}
2011-10-19 12:25:16 +02:00
// FIXME implement handleRelease
def handleRelease ( message : RemoteSystemDaemonMessageProtocol ) {
}
def handleGossip ( message : RemoteSystemDaemonMessageProtocol ) {
2011-10-20 15:40:05 +02:00
// try {
// val gossip = serialization.deserialize(message.getPayload.toByteArray, classOf[Gossip], None) match {
// case Left(error) ⇒ throw error
// case Right(instance) ⇒ instance.asInstanceOf[Gossip]
// }
// gossiper tell gossip
2011-10-22 16:06:20 +02:00
// sender ! Success(address.toString)
2011-10-20 15:40:05 +02:00
// } catch {
// case error: Throwable ⇒
2011-10-22 16:06:20 +02:00
// sender ! Failure(error)
2011-10-20 15:40:05 +02:00
// throw error
// }
2011-09-15 10:20:18 +02:00
}
2011-11-16 17:18:36 +01:00
/*
* generate name for temporary actor refs
*/
private val tempNumber = new AtomicLong
def tempName = {
val l = tempNumber . getAndIncrement ( )
"$_" + Helpers . base64 ( l )
}
def tempPath = remoteDaemon . path / tempName
2011-10-18 15:39:26 +02:00
// FIXME: handle real remote supervision
2011-10-19 12:25:16 +02:00
def handle_fun0_unit ( message : RemoteSystemDaemonMessageProtocol ) {
2011-10-12 09:10:05 +02:00
new LocalActorRef ( app ,
2011-09-15 10:20:18 +02:00
Props (
2011-09-29 12:44:52 +02:00
context ⇒ {
case f : Function0 [ _ ] ⇒ try { f ( ) } finally { context . self . stop ( ) }
2011-11-16 17:18:36 +01:00
} ) . copy ( dispatcher = computeGridDispatcher ) , remoteDaemon , tempPath , systemService = true ) ! payloadFor ( message , classOf [ Function0 [ Unit ] ] )
2011-09-15 10:20:18 +02:00
}
2011-10-18 15:39:26 +02:00
// FIXME: handle real remote supervision
2011-10-19 12:25:16 +02:00
def handle_fun0_any ( message : RemoteSystemDaemonMessageProtocol ) {
2011-10-12 09:10:05 +02:00
new LocalActorRef ( app ,
2011-09-15 10:20:18 +02:00
Props (
2011-09-29 12:44:52 +02:00
context ⇒ {
2011-10-22 16:06:20 +02:00
case f : Function0 [ _ ] ⇒ try { sender ! f ( ) } finally { context . self . stop ( ) }
2011-11-16 17:18:36 +01:00
} ) . copy ( dispatcher = computeGridDispatcher ) , remoteDaemon , tempPath , systemService = true ) forward payloadFor ( message , classOf [ Function0 [ Any ] ] )
2011-09-15 10:20:18 +02:00
}
2011-10-18 15:39:26 +02:00
// FIXME: handle real remote supervision
2011-10-19 12:25:16 +02:00
def handle_fun1_arg_unit ( message : RemoteSystemDaemonMessageProtocol ) {
2011-10-12 09:10:05 +02:00
new LocalActorRef ( app ,
2011-09-15 10:20:18 +02:00
Props (
2011-09-29 12:44:52 +02:00
context ⇒ {
case ( fun : Function [ _ , _ ] , param : Any ) ⇒ try { fun . asInstanceOf [ Any ⇒ Unit ] . apply ( param ) } finally { context . self . stop ( ) }
2011-11-16 17:18:36 +01:00
} ) . copy ( dispatcher = computeGridDispatcher ) , remoteDaemon , tempPath , systemService = true ) ! payloadFor ( message , classOf [ Tuple2 [ Function1 [ Any , Unit ] , Any ] ] )
2011-09-15 10:20:18 +02:00
}
2011-10-18 15:39:26 +02:00
// FIXME: handle real remote supervision
2011-10-19 12:25:16 +02:00
def handle_fun1_arg_any ( message : RemoteSystemDaemonMessageProtocol ) {
2011-10-12 09:10:05 +02:00
new LocalActorRef ( app ,
2011-09-15 10:20:18 +02:00
Props (
2011-09-29 12:44:52 +02:00
context ⇒ {
2011-10-22 16:06:20 +02:00
case ( fun : Function [ _ , _ ] , param : Any ) ⇒ try { sender ! fun . asInstanceOf [ Any ⇒ Any ] ( param ) } finally { context . self . stop ( ) }
2011-11-16 17:18:36 +01:00
} ) . copy ( dispatcher = computeGridDispatcher ) , remoteDaemon , tempPath , systemService = true ) forward payloadFor ( message , classOf [ Tuple2 [ Function1 [ Any , Any ] , Any ] ] )
2011-09-15 10:20:18 +02:00
}
2011-10-19 12:25:16 +02:00
def handleFailover ( message : RemoteSystemDaemonMessageProtocol ) {
2011-09-22 03:36:59 +02:00
// val (from, to) = payloadFor(message, classOf[(InetSocketremoteDaemonServiceName, InetSocketremoteDaemonServiceName)])
2011-09-15 10:20:18 +02:00
// cluster.failOverClusterActorRefConnections(from, to)
}
2011-10-19 12:25:16 +02:00
private def payloadFor [ T ] ( message : RemoteSystemDaemonMessageProtocol , clazz : Class [ T ] ) : T = {
2011-10-27 12:23:01 +02:00
app . serialization . deserialize ( message . getPayload . toByteArray , clazz , None ) match {
2011-09-15 10:20:18 +02:00
case Left ( error ) ⇒ throw error
case Right ( instance ) ⇒ instance . asInstanceOf [ T ]
}
}
}
2011-11-03 14:53:38 +01:00
class RemoteMessage ( input : RemoteMessageProtocol , remote : RemoteSupport , classLoader : Option [ ClassLoader ] = None ) {
2011-11-16 17:18:36 +01:00
val provider = remote . app . asInstanceOf [ ActorSystemImpl ] . provider
2011-11-03 14:53:38 +01:00
lazy val sender : ActorRef =
if ( input . hasSender )
2011-11-16 17:18:36 +01:00
provider . deserialize (
2011-11-08 14:30:33 +01:00
SerializedActorRef ( input . getSender . getHost , input . getSender . getPort , input . getSender . getPath ) ) . getOrElse ( throw new IllegalStateException ( "OHNOES" ) )
2011-11-03 14:53:38 +01:00
else
remote . app . deadLetters
2011-11-08 11:56:46 +01:00
2011-11-08 14:30:33 +01:00
lazy val recipient : ActorRef = remote . app . actorFor ( input . getRecipient . getPath ) . getOrElse ( remote . app . deadLetters )
2011-11-03 14:53:38 +01:00
lazy val payload : Either [ Throwable , AnyRef ] =
if ( input . hasException ) Left ( parseException ( ) )
else Right ( MessageSerializer . deserialize ( remote . app , input . getMessage , classLoader ) )
protected def parseException ( ) : Throwable = {
val exception = input . getException
val classname = exception . getClassname
try {
val exceptionClass =
if ( classLoader . isDefined ) classLoader . get . loadClass ( classname ) else Class . forName ( classname )
exceptionClass
. getConstructor ( Array [ Class [ _ ] ] ( classOf [ String ] ) : _ * )
. newInstance ( exception . getMessage ) . asInstanceOf [ Throwable ]
} catch {
case problem : Exception ⇒
2011-11-10 20:48:50 +01:00
remote . app . eventStream . publish ( Logging . Error ( problem , remote , problem . getMessage ) )
2011-11-03 14:53:38 +01:00
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException ( problem , classname , exception . getMessage )
}
}
2011-11-08 14:30:33 +01:00
override def toString = "RemoteMessage: " + recipient + "(" + input . getRecipient . getPath + ") from " + sender
2011-11-03 14:53:38 +01:00
}
2011-11-03 15:42:46 +01:00
trait RemoteMarshallingOps {
2011-11-10 20:08:00 +01:00
def app : ActorSystem
2011-11-03 15:42:46 +01:00
def createMessageSendEnvelope ( rmp : RemoteMessageProtocol ) : AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol . newBuilder
arp . setMessage ( rmp )
arp . build
}
def createControlEnvelope ( rcp : RemoteControlProtocol ) : AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol . newBuilder
arp . setInstruction ( rcp )
arp . build
}
/* *
* Serializes the ActorRef instance into a Protocol Buffers ( protobuf ) Message .
*/
def toRemoteActorRefProtocol ( actor : ActorRef ) : ActorRefProtocol = {
2011-11-16 17:18:36 +01:00
val rep = app . asInstanceOf [ ActorSystemImpl ] . provider . serialize ( actor )
2011-11-08 14:30:33 +01:00
ActorRefProtocol . newBuilder . setHost ( rep . hostname ) . setPort ( rep . port ) . setPath ( rep . path ) . build
2011-11-03 15:42:46 +01:00
}
def createRemoteMessageProtocolBuilder (
recipient : Either [ ActorRef , ActorRefProtocol ] ,
message : Either [ Throwable , Any ] ,
senderOption : Option [ ActorRef ] ) : RemoteMessageProtocol . Builder = {
val messageBuilder = RemoteMessageProtocol . newBuilder . setRecipient ( recipient . fold ( toRemoteActorRefProtocol _ , identity ) )
message match {
case Right ( message ) ⇒
messageBuilder . setMessage ( MessageSerializer . serialize ( app , message . asInstanceOf [ AnyRef ] ) )
case Left ( exception ) ⇒
messageBuilder . setException ( ExceptionProtocol . newBuilder
. setClassname ( exception . getClass . getName )
. setMessage ( Option ( exception . getMessage ) . getOrElse ( "" ) )
. build )
}
if ( senderOption . isDefined ) messageBuilder . setSender ( toRemoteActorRefProtocol ( senderOption . get ) )
messageBuilder
}
2011-11-09 12:41:37 +01:00
def receiveMessage ( remoteMessage : RemoteMessage , untrustedMode : Boolean ) {
val recipient = remoteMessage . recipient
remoteMessage . payload match {
case Left ( t ) ⇒ throw t
case Right ( r ) ⇒ r match {
case _ : Terminate ⇒ if ( untrustedMode ) throw new SecurityException ( "RemoteModule server is operating is untrusted mode, can not stop the actor" ) else recipient . stop ( )
case _ : AutoReceivedMessage if ( untrustedMode ) ⇒ throw new SecurityException ( "RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor" )
case m ⇒ recipient . ! ( m ) ( remoteMessage . sender )
}
}
}
2011-11-03 15:42:46 +01:00
}