2009-06-24 15:12:47 +02:00
/* *
2009-12-27 16:01:53 +01:00
* Copyright ( C ) 2009 - 2010 Scalable Solutions AB < http : //scalablesolutions.se>
2009-06-24 15:12:47 +02:00
*/
2009-12-16 23:20:15 +01:00
package se.scalablesolutions.akka.remote
2009-06-24 15:12:47 +02:00
2009-10-22 11:14:36 +02:00
import scala.collection.mutable.HashMap
2009-06-24 15:12:47 +02:00
2009-12-16 23:20:15 +01:00
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol. { RemoteRequest , RemoteReply }
2009-10-22 11:14:36 +02:00
import se.scalablesolutions.akka.actor. { Exit , Actor }
import se.scalablesolutions.akka.dispatch. { DefaultCompletableFutureResult , CompletableFutureResult }
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.Config.config
2009-07-18 00:16:32 +02:00
2009-07-01 15:29:06 +02:00
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
2009-11-24 17:41:08 +01:00
import org.jboss.netty.bootstrap.ClientBootstrap
2009-07-18 00:16:32 +02:00
import org.jboss.netty.handler.codec.frame. { LengthFieldBasedFrameDecoder , LengthFieldPrepender }
2009-11-24 17:41:08 +01:00
import org.jboss.netty.handler.codec.compression. { ZlibDecoder , ZlibEncoder }
2009-07-18 00:16:32 +02:00
import org.jboss.netty.handler.codec.protobuf. { ProtobufDecoder , ProtobufEncoder }
2009-10-22 11:14:36 +02:00
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
import org.jboss.netty.util. { TimerTask , Timeout , HashedWheelTimer }
2009-06-24 15:12:47 +02:00
2009-12-29 14:24:48 +01:00
import java.net. { SocketAddress , InetSocketAddress }
2009-10-22 11:14:36 +02:00
import java.util.concurrent. { TimeUnit , Executors , ConcurrentMap , ConcurrentHashMap }
2009-12-27 08:24:11 +01:00
import java.util.concurrent.atomic.AtomicLong
import org.codehaus.aspectwerkz.proxy.Uuid
2009-12-27 22:56:55 +01:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-12-27 08:24:11 +01:00
object RemoteRequestIdFactory {
private val nodeId = Uuid . newUuid
private val id = new AtomicLong
def nextId : Long = id . getAndIncrement + nodeId
}
2009-07-23 20:01:37 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-07-01 15:29:06 +02:00
object RemoteClient extends Logging {
2009-10-22 11:14:36 +02:00
val READ_TIMEOUT = config . getInt ( "akka.remote.client.read-timeout" , 10000 )
val RECONNECT_DELAY = config . getInt ( "akka.remote.client.reconnect-delay" , 5000 )
2009-07-02 13:23:03 +02:00
private val clients = new HashMap [ String , RemoteClient ]
2009-10-22 11:14:36 +02:00
2009-07-02 13:23:03 +02:00
def clientFor ( address : InetSocketAddress ) : RemoteClient = synchronized {
val hostname = address . getHostName
val port = address . getPort
2009-10-22 11:14:36 +02:00
val hash = hostname + ':' + port
2009-07-02 13:23:03 +02:00
if ( clients . contains ( hash ) ) clients ( hash )
else {
val client = new RemoteClient ( hostname , port )
client . connect
2009-07-03 17:15:36 +02:00
clients += hash -> client
2009-07-02 13:23:03 +02:00
client
}
}
2009-12-14 19:22:37 +01:00
2009-12-27 22:56:55 +01:00
/* *
* Clean - up all open connections .
*/
2009-12-14 19:22:37 +01:00
def shutdownAll ( ) = synchronized {
2009-12-27 22:56:55 +01:00
clients . foreach ( { case ( addr , client ) => client . shutdown } )
clients . clear
2009-12-14 19:22:37 +01:00
}
2009-07-02 13:23:03 +02:00
}
2009-06-24 15:12:47 +02:00
2009-07-23 20:01:37 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-07-02 13:23:03 +02:00
class RemoteClient ( hostname : String , port : Int ) extends Logging {
2009-08-11 12:16:50 +02:00
val name = "RemoteClient@" + hostname
2009-10-22 11:14:36 +02:00
2009-06-25 13:07:58 +02:00
@volatile private var isRunning = false
2009-06-24 15:12:47 +02:00
private val futures = new ConcurrentHashMap [ Long , CompletableFutureResult ]
2009-07-01 15:29:06 +02:00
private val supervisors = new ConcurrentHashMap [ String , Actor ]
2009-06-24 15:12:47 +02:00
private val channelFactory = new NioClientSocketChannelFactory (
Executors . newCachedThreadPool ,
Executors . newCachedThreadPool )
private val bootstrap = new ClientBootstrap ( channelFactory )
2009-12-14 19:22:37 +01:00
private val timer = new HashedWheelTimer
2009-12-29 14:24:48 +01:00
private val remoteAddress = new InetSocketAddress ( hostname , port )
2009-12-14 19:22:37 +01:00
2009-12-29 14:24:48 +01:00
bootstrap . setPipelineFactory ( new RemoteClientPipelineFactory ( name , futures , supervisors , bootstrap , remoteAddress , timer ) )
2009-06-24 15:12:47 +02:00
bootstrap . setOption ( "tcpNoDelay" , true )
bootstrap . setOption ( "keepAlive" , true )
2009-06-25 13:07:58 +02:00
private var connection : ChannelFuture = _
2009-06-24 15:12:47 +02:00
2009-06-25 13:07:58 +02:00
def connect = synchronized {
if ( ! isRunning ) {
2009-12-29 14:24:48 +01:00
connection = bootstrap . connect ( remoteAddress )
2009-07-02 13:23:03 +02:00
log . info ( "Starting remote client connection to [%s:%s]" , hostname , port )
2009-06-25 13:07:58 +02:00
// Wait until the connection attempt succeeds or fails.
connection . awaitUninterruptibly
if ( ! connection . isSuccess ) {
2009-11-22 15:25:16 +01:00
log . error ( connection . getCause , "Remote connection to [%s:%s] has failed" , hostname , port )
2009-06-25 13:07:58 +02:00
}
isRunning = true
}
2009-06-24 15:12:47 +02:00
}
2009-06-25 13:07:58 +02:00
def shutdown = synchronized {
if ( ! isRunning ) {
connection . getChannel . getCloseFuture . awaitUninterruptibly
channelFactory . releaseExternalResources
}
2009-12-27 22:56:55 +01:00
timer . stop
2009-06-24 15:12:47 +02:00
}
2009-12-27 22:56:55 +01:00
def send ( request : RemoteRequest , senderFuture : Option [ CompletableFutureResult ] ) : Option [ CompletableFutureResult ] = if ( isRunning ) {
2009-07-18 00:16:32 +02:00
if ( request . getIsOneWay ) {
connection . getChannel . write ( request )
2009-06-30 16:01:50 +02:00
None
2009-06-24 15:12:47 +02:00
} else {
futures . synchronized {
2009-12-27 22:56:55 +01:00
val futureResult = if ( senderFuture . isDefined ) senderFuture . get
else new DefaultCompletableFutureResult ( request . getTimeout )
2009-07-18 00:16:32 +02:00
futures . put ( request . getId , futureResult )
connection . getChannel . write ( request )
2009-06-30 16:01:50 +02:00
Some ( futureResult )
2009-06-24 15:12:47 +02:00
}
}
2009-07-01 15:29:06 +02:00
} else throw new IllegalStateException ( "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it." )
def registerSupervisorForActor ( actor : Actor ) =
2009-10-28 13:20:28 +01:00
if ( ! actor . _supervisor . isDefined ) throw new IllegalStateException ( "Can't register supervisor for " + actor + " since it is not under supervision" )
else supervisors . putIfAbsent ( actor . _supervisor . get . uuid , actor )
2009-07-01 15:29:06 +02:00
def deregisterSupervisorForActor ( actor : Actor ) =
2009-10-28 13:20:28 +01:00
if ( ! actor . _supervisor . isDefined ) throw new IllegalStateException ( "Can't unregister supervisor for " + actor + " since it is not under supervision" )
else supervisors . remove ( actor . _supervisor . get . uuid )
2009-07-01 15:29:06 +02:00
def deregisterSupervisorWithUuid ( uuid : String ) = supervisors . remove ( uuid )
2009-07-18 00:16:32 +02:00
}
2009-06-24 15:12:47 +02:00
2009-07-27 21:21:28 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-08-11 12:16:50 +02:00
class RemoteClientPipelineFactory ( name : String ,
futures : ConcurrentMap [ Long , CompletableFutureResult ] ,
2009-10-22 11:14:36 +02:00
supervisors : ConcurrentMap [ String , Actor ] ,
2009-12-14 19:22:37 +01:00
bootstrap : ClientBootstrap ,
2009-12-29 14:24:48 +01:00
remoteAddress : SocketAddress ,
timer : HashedWheelTimer ) extends ChannelPipelineFactory {
2009-07-18 00:16:32 +02:00
def getPipeline : ChannelPipeline = {
2009-10-22 11:14:36 +02:00
val pipeline = Channels . pipeline ( )
2009-12-14 19:22:37 +01:00
pipeline . addLast ( "timeout" , new ReadTimeoutHandler ( timer , RemoteClient . READ_TIMEOUT ) )
2009-11-22 14:32:27 +01:00
RemoteServer . COMPRESSION_SCHEME match {
case "zlib" => pipeline . addLast ( "zlibDecoder" , new ZlibDecoder )
//case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder)
case _ => { } // no compression
}
2009-10-22 11:14:36 +02:00
pipeline . addLast ( "frameDecoder" , new LengthFieldBasedFrameDecoder ( 1048576 , 0 , 4 , 0 , 4 ) )
pipeline . addLast ( "protobufDecoder" , new ProtobufDecoder ( RemoteReply . getDefaultInstance ) )
2009-11-22 14:32:27 +01:00
RemoteServer . COMPRESSION_SCHEME match {
case "zlib" => pipeline . addLast ( "zlibEncoder" , new ZlibEncoder ( RemoteServer . ZLIB_COMPRESSION_LEVEL ) )
//case "lzf" => pipeline.addLast("lzfEncoder", new LzfEncoder)
case _ => { } // no compression
}
2009-10-22 11:14:36 +02:00
pipeline . addLast ( "frameEncoder" , new LengthFieldPrepender ( 4 ) )
pipeline . addLast ( "protobufEncoder" , new ProtobufEncoder ( ) )
2009-12-29 14:24:48 +01:00
pipeline . addLast ( "handler" , new RemoteClientHandler ( name , futures , supervisors , bootstrap , remoteAddress , timer ) )
2009-10-22 11:14:36 +02:00
pipeline
2009-06-24 15:12:47 +02:00
}
}
2009-07-27 21:21:28 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-06-24 15:12:47 +02:00
@ChannelPipelineCoverage { val value = "all" }
2009-08-11 12:16:50 +02:00
class RemoteClientHandler ( val name : String ,
val futures : ConcurrentMap [ Long , CompletableFutureResult ] ,
2009-10-22 11:14:36 +02:00
val supervisors : ConcurrentMap [ String , Actor ] ,
2009-12-14 19:22:37 +01:00
val bootstrap : ClientBootstrap ,
2009-12-29 14:24:48 +01:00
val remoteAddress : SocketAddress ,
val timer : HashedWheelTimer )
2009-07-01 15:29:06 +02:00
extends SimpleChannelUpstreamHandler with Logging {
2009-11-23 15:19:53 +01:00
import Actor.Sender.Self
2009-06-24 15:12:47 +02:00
override def handleUpstream ( ctx : ChannelHandlerContext , event : ChannelEvent ) = {
2009-11-21 20:51:03 +01:00
if ( event . isInstanceOf [ ChannelStateEvent ] &&
event . asInstanceOf [ ChannelStateEvent ] . getState != ChannelState . INTEREST_OPS ) {
2009-06-24 15:12:47 +02:00
log . debug ( event . toString )
}
super . handleUpstream ( ctx , event )
}
override def messageReceived ( ctx : ChannelHandlerContext , event : MessageEvent ) {
try {
val result = event . getMessage
if ( result . isInstanceOf [ RemoteReply ] ) {
val reply = result . asInstanceOf [ RemoteReply ]
2009-10-22 11:14:36 +02:00
log . debug ( "Remote client received RemoteReply[\n%s]" , reply . toString )
2009-07-13 00:17:57 +02:00
val future = futures . get ( reply . getId )
2009-07-18 00:16:32 +02:00
if ( reply . getIsSuccessful ) {
2009-07-23 20:01:37 +02:00
val message = RemoteProtocolBuilder . getMessage ( reply )
2009-07-18 00:16:32 +02:00
future . completeWithResult ( message )
} else {
2009-10-22 11:14:36 +02:00
if ( reply . hasSupervisorUuid ( ) ) {
2009-07-18 00:16:32 +02:00
val supervisorUuid = reply . getSupervisorUuid
2009-07-01 15:29:06 +02:00
if ( ! supervisors . containsKey ( supervisorUuid ) ) throw new IllegalStateException ( "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found" )
val supervisedActor = supervisors . get ( supervisorUuid )
2009-10-28 13:20:28 +01:00
if ( ! supervisedActor . _supervisor . isDefined ) throw new IllegalStateException ( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed" )
else supervisedActor . _supervisor . get ! Exit ( supervisedActor , parseException ( reply ) )
2009-07-01 15:29:06 +02:00
}
2009-07-23 20:01:37 +02:00
future . completeWithException ( null , parseException ( reply ) )
2009-07-01 15:29:06 +02:00
}
2009-08-11 12:16:50 +02:00
futures . remove ( reply . getId )
2009-07-01 15:29:06 +02:00
} else throw new IllegalArgumentException ( "Unknown message received in remote client handler: " + result )
2009-06-24 15:12:47 +02:00
} catch {
2009-07-01 15:29:06 +02:00
case e : Exception =>
log . error ( "Unexpected exception in remote client handler: %s" , e )
throw e
2009-06-24 15:12:47 +02:00
}
}
2009-10-22 11:14:36 +02:00
override def channelClosed ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = {
2009-12-14 19:22:37 +01:00
timer . newTimeout ( new TimerTask ( ) {
2009-10-22 11:14:36 +02:00
def run ( timeout : Timeout ) = {
2009-12-29 14:24:48 +01:00
log . debug ( "Remote client reconnecting to [%s]" , remoteAddress )
bootstrap . connect ( remoteAddress )
2009-10-22 11:14:36 +02:00
}
} , RemoteClient . RECONNECT_DELAY , TimeUnit . MILLISECONDS )
}
override def channelConnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) =
log . debug ( "Remote client connected to [%s]" , ctx . getChannel . getRemoteAddress )
override def channelDisconnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) =
log . debug ( "Remote client disconnected from [%s]" , ctx . getChannel . getRemoteAddress ) ;
override def exceptionCaught ( ctx : ChannelHandlerContext , event : ExceptionEvent ) = {
2009-11-22 15:25:16 +01:00
log . error ( event . getCause , "Unexpected exception from downstream in remote client" )
2009-06-24 15:12:47 +02:00
event . getChannel . close
}
2009-07-23 20:01:37 +02:00
private def parseException ( reply : RemoteReply ) = {
val exception = reply . getException
val exceptionType = Class . forName ( exception . substring ( 0 , exception . indexOf ( '$' ) ) )
val exceptionMessage = exception . substring ( exception . indexOf ( '$' ) + 1 , exception . length )
exceptionType
. getConstructor ( Array [ Class [ _ ] ] ( classOf [ String ] ) : _ * )
. newInstance ( exceptionMessage ) . asInstanceOf [ Throwable ]
}
2009-06-24 15:12:47 +02:00
}