2009-06-24 15:12:47 +02:00
/* *
2009-12-27 16:01:53 +01:00
* Copyright ( C ) 2009 - 2010 Scalable Solutions AB < http : //scalablesolutions.se>
2009-06-24 15:12:47 +02:00
*/
2009-12-16 23:20:15 +01:00
package se.scalablesolutions.akka.remote
2009-06-24 15:12:47 +02:00
2009-12-16 23:20:15 +01:00
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol. { RemoteRequest , RemoteReply }
2009-10-22 11:14:36 +02:00
import se.scalablesolutions.akka.actor. { Exit , Actor }
import se.scalablesolutions.akka.dispatch. { DefaultCompletableFutureResult , CompletableFutureResult }
2009-12-30 09:24:10 +01:00
import se.scalablesolutions.akka.util. { UUID , Logging }
2009-10-22 11:14:36 +02:00
import se.scalablesolutions.akka.Config.config
2009-07-18 00:16:32 +02:00
2009-07-01 15:29:06 +02:00
import org.jboss.netty.channel._
2010-02-15 16:21:26 +01:00
import group.DefaultChannelGroup
2009-07-01 15:29:06 +02:00
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
2009-11-24 17:41:08 +01:00
import org.jboss.netty.bootstrap.ClientBootstrap
2009-07-18 00:16:32 +02:00
import org.jboss.netty.handler.codec.frame. { LengthFieldBasedFrameDecoder , LengthFieldPrepender }
2009-11-24 17:41:08 +01:00
import org.jboss.netty.handler.codec.compression. { ZlibDecoder , ZlibEncoder }
2009-07-18 00:16:32 +02:00
import org.jboss.netty.handler.codec.protobuf. { ProtobufDecoder , ProtobufEncoder }
2009-10-22 11:14:36 +02:00
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
import org.jboss.netty.util. { TimerTask , Timeout , HashedWheelTimer }
2009-06-24 15:12:47 +02:00
2009-12-29 14:24:48 +01:00
import java.net. { SocketAddress , InetSocketAddress }
2009-10-22 11:14:36 +02:00
import java.util.concurrent. { TimeUnit , Executors , ConcurrentMap , ConcurrentHashMap }
2009-12-27 08:24:11 +01:00
import java.util.concurrent.atomic.AtomicLong
2010-02-16 15:39:09 +01:00
import scala.collection.mutable. { HashSet , HashMap }
2009-12-27 22:56:55 +01:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-12-27 08:24:11 +01:00
object RemoteRequestIdFactory {
2009-12-30 09:24:10 +01:00
private val nodeId = UUID . newUuid
2009-12-27 08:24:11 +01:00
private val id = new AtomicLong
2010-02-16 15:39:09 +01:00
2009-12-27 08:24:11 +01:00
def nextId : Long = id . getAndIncrement + nodeId
}
2009-07-23 20:01:37 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-07-01 15:29:06 +02:00
object RemoteClient extends Logging {
2009-10-22 11:14:36 +02:00
val READ_TIMEOUT = config . getInt ( "akka.remote.client.read-timeout" , 10000 )
val RECONNECT_DELAY = config . getInt ( "akka.remote.client.reconnect-delay" , 5000 )
2010-02-15 16:21:26 +01:00
private val remoteClients = new HashMap [ String , RemoteClient ]
private val remoteActors = new HashMap [ RemoteServer . Address , HashSet [ String ] ]
2010-02-16 15:39:09 +01:00
// FIXME: simplify overloaded methods when we have Scala 2.8
2010-02-17 15:32:17 +01:00
2010-02-16 15:39:09 +01:00
def actorFor ( className : String , hostname : String , port : Int ) : Actor =
2010-02-17 15:32:17 +01:00
actorFor ( className , className , 5000L , hostname , port )
2010-02-16 15:39:09 +01:00
def actorFor ( actorId : String , className : String , hostname : String , port : Int ) : Actor =
2010-02-17 15:32:17 +01:00
actorFor ( actorId , className , 5000L , hostname , port )
2010-02-16 15:39:09 +01:00
def actorFor ( className : String , timeout : Long , hostname : String , port : Int ) : Actor =
actorFor ( className , className , timeout , hostname , port )
def actorFor ( actorId : String , className : String , timeout : Long , hostname : String , port : Int ) : Actor = {
new Actor {
start
val remoteClient = RemoteClient . clientFor ( hostname , port )
override def postMessageToMailbox ( message : Any , sender : Option [ Actor ] ) : Unit = {
val requestBuilder = RemoteRequest . newBuilder
. setId ( RemoteRequestIdFactory . nextId )
. setTarget ( className )
. setTimeout ( timeout )
. setUuid ( actorId )
. setIsActor ( true )
. setIsOneWay ( true )
. setIsEscaped ( false )
if ( sender . isDefined ) {
val s = sender . get
requestBuilder . setSourceTarget ( s . getClass . getName )
requestBuilder . setSourceUuid ( s . uuid )
val ( host , port ) = s . _replyToAddress . map ( a => ( a . getHostName , a . getPort ) ) . getOrElse ( ( Actor . HOSTNAME , Actor . PORT ) )
requestBuilder . setSourceHostname ( host )
requestBuilder . setSourcePort ( port )
}
RemoteProtocolBuilder . setMessage ( message , requestBuilder )
remoteClient . send ( requestBuilder . build , None )
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout (
message : Any ,
timeout : Long ,
senderFuture : Option [ CompletableFutureResult ] ) : CompletableFutureResult = {
val requestBuilder = RemoteRequest . newBuilder
. setId ( RemoteRequestIdFactory . nextId )
. setTarget ( className )
. setTimeout ( timeout )
. setUuid ( actorId )
. setIsActor ( true )
. setIsOneWay ( false )
. setIsEscaped ( false )
RemoteProtocolBuilder . setMessage ( message , requestBuilder )
val future = remoteClient . send ( requestBuilder . build , senderFuture )
if ( future . isDefined ) future . get
else throw new IllegalStateException ( "Expected a future from remote call to actor " + toString )
}
def receive = { case _ => { } }
}
}
2010-02-15 16:21:26 +01:00
def clientFor ( hostname : String , port : Int ) : RemoteClient = clientFor ( new InetSocketAddress ( hostname , port ) )
2009-10-22 11:14:36 +02:00
2009-07-02 13:23:03 +02:00
def clientFor ( address : InetSocketAddress ) : RemoteClient = synchronized {
val hostname = address . getHostName
val port = address . getPort
2009-10-22 11:14:36 +02:00
val hash = hostname + ':' + port
2010-02-15 16:21:26 +01:00
if ( remoteClients . contains ( hash ) ) remoteClients ( hash )
2009-07-02 13:23:03 +02:00
else {
val client = new RemoteClient ( hostname , port )
client . connect
2010-02-15 16:21:26 +01:00
remoteClients += hash -> client
2009-07-02 13:23:03 +02:00
client
}
}
2009-12-14 19:22:37 +01:00
2010-02-15 16:21:26 +01:00
def shutdownClientFor ( address : InetSocketAddress ) = synchronized {
val hostname = address . getHostName
val port = address . getPort
val hash = hostname + ':' + port
if ( remoteClients . contains ( hash ) ) {
val client = remoteClients ( hash )
client . shutdown
remoteClients - hash
}
}
2009-12-27 22:56:55 +01:00
/* *
* Clean - up all open connections .
*/
2010-02-15 16:21:26 +01:00
def shutdownAll = synchronized {
remoteClients . foreach ( { case ( addr , client ) => client . shutdown } )
remoteClients . clear
}
private [ akka ] def register ( hostname : String , port : Int , uuid : String ) = synchronized {
actorsFor ( RemoteServer . Address ( hostname , port ) ) + uuid
}
// TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback
private [ akka ] def unregister ( hostname : String , port : Int , uuid : String ) = synchronized {
val set = actorsFor ( RemoteServer . Address ( hostname , port ) )
set - uuid
if ( set . isEmpty ) shutdownClientFor ( new InetSocketAddress ( hostname , port ) )
}
private [ akka ] def actorsFor ( remoteServerAddress : RemoteServer . Address ) : HashSet [ String ] = {
val set = remoteActors . get ( remoteServerAddress )
if ( set . isDefined && ( set . get ne null ) ) set . get
else {
val remoteActorSet = new HashSet [ String ]
remoteActors . put ( remoteServerAddress , remoteActorSet )
remoteActorSet
}
2009-12-14 19:22:37 +01:00
}
2009-07-02 13:23:03 +02:00
}
2009-06-24 15:12:47 +02:00
2009-07-23 20:01:37 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-07-02 13:23:03 +02:00
class RemoteClient ( hostname : String , port : Int ) extends Logging {
2010-02-15 16:21:26 +01:00
val name = "RemoteClient@" + hostname + "::" + port
@volatile private [ remote ] var isRunning = false
2009-06-24 15:12:47 +02:00
private val futures = new ConcurrentHashMap [ Long , CompletableFutureResult ]
2009-07-01 15:29:06 +02:00
private val supervisors = new ConcurrentHashMap [ String , Actor ]
2009-06-24 15:12:47 +02:00
private val channelFactory = new NioClientSocketChannelFactory (
Executors . newCachedThreadPool ,
Executors . newCachedThreadPool )
private val bootstrap = new ClientBootstrap ( channelFactory )
2010-02-15 16:21:26 +01:00
private val openChannels = new DefaultChannelGroup ( classOf [ RemoteClient ] . getName ) ;
2009-06-24 15:12:47 +02:00
2009-12-14 19:22:37 +01:00
private val timer = new HashedWheelTimer
2009-12-29 14:24:48 +01:00
private val remoteAddress = new InetSocketAddress ( hostname , port )
2009-12-30 08:36:24 +01:00
private [ remote ] var connection : ChannelFuture = _
2009-12-14 19:22:37 +01:00
2009-12-30 08:36:24 +01:00
bootstrap . setPipelineFactory ( new RemoteClientPipelineFactory ( name , futures , supervisors , bootstrap , remoteAddress , timer , this ) )
2009-06-24 15:12:47 +02:00
bootstrap . setOption ( "tcpNoDelay" , true )
bootstrap . setOption ( "keepAlive" , true )
2009-06-25 13:07:58 +02:00
def connect = synchronized {
if ( ! isRunning ) {
2009-12-29 14:24:48 +01:00
connection = bootstrap . connect ( remoteAddress )
2009-07-02 13:23:03 +02:00
log . info ( "Starting remote client connection to [%s:%s]" , hostname , port )
2009-06-25 13:07:58 +02:00
// Wait until the connection attempt succeeds or fails.
2010-02-15 16:21:26 +01:00
val channel = connection . awaitUninterruptibly . getChannel
openChannels . add ( channel )
if ( ! connection . isSuccess ) log . error ( connection . getCause , "Remote client connection to [%s:%s] has failed" , hostname , port )
2009-06-25 13:07:58 +02:00
isRunning = true
}
2009-06-24 15:12:47 +02:00
}
2009-06-25 13:07:58 +02:00
def shutdown = synchronized {
2010-02-15 16:21:26 +01:00
if ( isRunning ) {
isRunning = false
openChannels . close . awaitUninterruptibly
bootstrap . releaseExternalResources
timer . stop
log . info ( "%s has been shut down" , name )
2009-06-25 13:07:58 +02:00
}
2009-06-24 15:12:47 +02:00
}
2009-12-27 22:56:55 +01:00
def send ( request : RemoteRequest , senderFuture : Option [ CompletableFutureResult ] ) : Option [ CompletableFutureResult ] = if ( isRunning ) {
2009-07-18 00:16:32 +02:00
if ( request . getIsOneWay ) {
connection . getChannel . write ( request )
2009-06-30 16:01:50 +02:00
None
2009-06-24 15:12:47 +02:00
} else {
futures . synchronized {
2009-12-27 22:56:55 +01:00
val futureResult = if ( senderFuture . isDefined ) senderFuture . get
2010-02-16 15:39:09 +01:00
else new DefaultCompletableFutureResult ( request . getTimeout )
2009-07-18 00:16:32 +02:00
futures . put ( request . getId , futureResult )
connection . getChannel . write ( request )
2009-06-30 16:01:50 +02:00
Some ( futureResult )
2010-02-15 16:21:26 +01:00
}
2009-06-24 15:12:47 +02:00
}
2009-07-01 15:29:06 +02:00
} else throw new IllegalStateException ( "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it." )
def registerSupervisorForActor ( actor : Actor ) =
2009-10-28 13:20:28 +01:00
if ( ! actor . _supervisor . isDefined ) throw new IllegalStateException ( "Can't register supervisor for " + actor + " since it is not under supervision" )
else supervisors . putIfAbsent ( actor . _supervisor . get . uuid , actor )
2009-07-01 15:29:06 +02:00
def deregisterSupervisorForActor ( actor : Actor ) =
2009-10-28 13:20:28 +01:00
if ( ! actor . _supervisor . isDefined ) throw new IllegalStateException ( "Can't unregister supervisor for " + actor + " since it is not under supervision" )
else supervisors . remove ( actor . _supervisor . get . uuid )
2010-02-15 16:21:26 +01:00
2009-07-01 15:29:06 +02:00
def deregisterSupervisorWithUuid ( uuid : String ) = supervisors . remove ( uuid )
2009-07-18 00:16:32 +02:00
}
2009-06-24 15:12:47 +02:00
2009-07-27 21:21:28 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2010-02-15 16:21:26 +01:00
class RemoteClientPipelineFactory ( name : String ,
2009-08-11 12:16:50 +02:00
futures : ConcurrentMap [ Long , CompletableFutureResult ] ,
2009-10-22 11:14:36 +02:00
supervisors : ConcurrentMap [ String , Actor ] ,
2009-12-14 19:22:37 +01:00
bootstrap : ClientBootstrap ,
2009-12-29 14:24:48 +01:00
remoteAddress : SocketAddress ,
2009-12-30 08:36:24 +01:00
timer : HashedWheelTimer ,
client : RemoteClient ) extends ChannelPipelineFactory {
2009-07-18 00:16:32 +02:00
def getPipeline : ChannelPipeline = {
2010-02-16 15:39:09 +01:00
val timeout = new ReadTimeoutHandler ( timer , RemoteClient . READ_TIMEOUT )
val lenDec = new LengthFieldBasedFrameDecoder ( 1048576 , 0 , 4 , 0 , 4 )
val lenPrep = new LengthFieldPrepender ( 4 )
val protobufDec = new ProtobufDecoder ( RemoteReply . getDefaultInstance )
val protobufEnc = new ProtobufEncoder
2009-12-30 09:24:10 +01:00
val zipCodec = RemoteServer . COMPRESSION_SCHEME match {
2010-02-16 15:39:09 +01:00
case "zlib" => Some ( Codec ( new ZlibEncoder ( RemoteServer . ZLIB_COMPRESSION_LEVEL ) , new ZlibDecoder ) )
2009-12-30 09:24:10 +01:00
//case "lzf" => Some(Codec(new LzfEncoder, new LzfDecoder))
case _ => None
}
val remoteClient = new RemoteClientHandler ( name , futures , supervisors , bootstrap , remoteAddress , timer , client )
2010-02-15 16:21:26 +01:00
val stages : Array [ ChannelHandler ] =
2010-02-16 15:39:09 +01:00
zipCodec . map ( codec => Array ( timeout , codec . decoder , lenDec , protobufDec , codec . encoder , lenPrep , protobufEnc , remoteClient ) )
. getOrElse ( Array ( timeout , lenDec , protobufDec , lenPrep , protobufEnc , remoteClient ) )
2009-12-30 09:24:10 +01:00
new StaticChannelPipeline ( stages : _ * )
2009-06-24 15:12:47 +02:00
}
}
2009-07-27 21:21:28 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2010-02-16 15:39:09 +01:00
@ChannelPipelineCoverage { val value = "all" }
2009-08-11 12:16:50 +02:00
class RemoteClientHandler ( val name : String ,
val futures : ConcurrentMap [ Long , CompletableFutureResult ] ,
2009-10-22 11:14:36 +02:00
val supervisors : ConcurrentMap [ String , Actor ] ,
2009-12-14 19:22:37 +01:00
val bootstrap : ClientBootstrap ,
2009-12-29 14:24:48 +01:00
val remoteAddress : SocketAddress ,
2009-12-30 08:36:24 +01:00
val timer : HashedWheelTimer ,
val client : RemoteClient )
2010-02-16 15:39:09 +01:00
extends SimpleChannelUpstreamHandler with Logging {
2009-11-23 15:19:53 +01:00
import Actor.Sender.Self
2009-06-24 15:12:47 +02:00
override def handleUpstream ( ctx : ChannelHandlerContext , event : ChannelEvent ) = {
2009-11-21 20:51:03 +01:00
if ( event . isInstanceOf [ ChannelStateEvent ] &&
event . asInstanceOf [ ChannelStateEvent ] . getState != ChannelState . INTEREST_OPS ) {
2009-06-24 15:12:47 +02:00
log . debug ( event . toString )
}
super . handleUpstream ( ctx , event )
}
override def messageReceived ( ctx : ChannelHandlerContext , event : MessageEvent ) {
try {
val result = event . getMessage
if ( result . isInstanceOf [ RemoteReply ] ) {
val reply = result . asInstanceOf [ RemoteReply ]
2009-10-22 11:14:36 +02:00
log . debug ( "Remote client received RemoteReply[\n%s]" , reply . toString )
2009-07-13 00:17:57 +02:00
val future = futures . get ( reply . getId )
2009-07-18 00:16:32 +02:00
if ( reply . getIsSuccessful ) {
2009-07-23 20:01:37 +02:00
val message = RemoteProtocolBuilder . getMessage ( reply )
2009-07-18 00:16:32 +02:00
future . completeWithResult ( message )
} else {
2009-10-22 11:14:36 +02:00
if ( reply . hasSupervisorUuid ( ) ) {
2009-07-18 00:16:32 +02:00
val supervisorUuid = reply . getSupervisorUuid
2009-07-01 15:29:06 +02:00
if ( ! supervisors . containsKey ( supervisorUuid ) ) throw new IllegalStateException ( "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found" )
val supervisedActor = supervisors . get ( supervisorUuid )
2009-10-28 13:20:28 +01:00
if ( ! supervisedActor . _supervisor . isDefined ) throw new IllegalStateException ( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed" )
else supervisedActor . _supervisor . get ! Exit ( supervisedActor , parseException ( reply ) )
2009-07-01 15:29:06 +02:00
}
2009-07-23 20:01:37 +02:00
future . completeWithException ( null , parseException ( reply ) )
2009-07-01 15:29:06 +02:00
}
2009-08-11 12:16:50 +02:00
futures . remove ( reply . getId )
2009-07-01 15:29:06 +02:00
} else throw new IllegalArgumentException ( "Unknown message received in remote client handler: " + result )
2009-06-24 15:12:47 +02:00
} catch {
2009-07-01 15:29:06 +02:00
case e : Exception =>
log . error ( "Unexpected exception in remote client handler: %s" , e )
throw e
2009-06-24 15:12:47 +02:00
}
2010-02-15 16:21:26 +01:00
}
2009-06-24 15:12:47 +02:00
2010-02-15 16:21:26 +01:00
override def channelClosed ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = if ( client . isRunning ) {
2009-12-14 19:22:37 +01:00
timer . newTimeout ( new TimerTask ( ) {
2009-10-22 11:14:36 +02:00
def run ( timeout : Timeout ) = {
2009-12-29 14:24:48 +01:00
log . debug ( "Remote client reconnecting to [%s]" , remoteAddress )
2009-12-30 08:36:24 +01:00
client . connection = bootstrap . connect ( remoteAddress )
// Wait until the connection attempt succeeds or fails.
client . connection . awaitUninterruptibly
if ( ! client . connection . isSuccess ) log . error ( client . connection . getCause , "Reconnection to [%s] has failed" , remoteAddress )
2009-10-22 11:14:36 +02:00
}
} , RemoteClient . RECONNECT_DELAY , TimeUnit . MILLISECONDS )
}
override def channelConnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) =
log . debug ( "Remote client connected to [%s]" , ctx . getChannel . getRemoteAddress )
override def channelDisconnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) =
log . debug ( "Remote client disconnected from [%s]" , ctx . getChannel . getRemoteAddress ) ;
override def exceptionCaught ( ctx : ChannelHandlerContext , event : ExceptionEvent ) = {
2009-11-22 15:25:16 +01:00
log . error ( event . getCause , "Unexpected exception from downstream in remote client" )
2009-06-24 15:12:47 +02:00
event . getChannel . close
}
2009-07-23 20:01:37 +02:00
private def parseException ( reply : RemoteReply ) = {
val exception = reply . getException
val exceptionType = Class . forName ( exception . substring ( 0 , exception . indexOf ( '$' ) ) )
val exceptionMessage = exception . substring ( exception . indexOf ( '$' ) + 1 , exception . length )
exceptionType
2010-02-16 15:39:09 +01:00
. getConstructor ( Array [ Class [ _ ] ] ( classOf [ String ] ) : _ * )
. newInstance ( exceptionMessage ) . asInstanceOf [ Throwable ]
2009-07-23 20:01:37 +02:00
}
2009-06-24 15:12:47 +02:00
}