2009-06-24 15:12:47 +02:00
/* *
* Copyright ( C ) 2009 Scalable Solutions .
*/
package se.scalablesolutions.akka.kernel.nio
import java.net.InetSocketAddress
import java.util.concurrent. { Executors , ConcurrentMap , ConcurrentHashMap }
2009-07-01 15:29:06 +02:00
import kernel.actor. { Exit , Actor }
2009-06-30 16:01:50 +02:00
import kernel.reactor. { DefaultCompletableFutureResult , CompletableFutureResult }
import kernel.util.Logging
2009-06-24 15:12:47 +02:00
2009-07-01 15:29:06 +02:00
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
2009-06-24 15:12:47 +02:00
import org.jboss.netty.handler.codec.serialization. { ObjectEncoder , ObjectDecoder }
import org.jboss.netty.bootstrap.ClientBootstrap
2009-07-02 13:23:03 +02:00
import scala.collection.mutable.HashMap
2009-07-01 15:29:06 +02:00
object RemoteClient extends Logging {
2009-07-02 13:23:03 +02:00
private val clients = new HashMap [ String , RemoteClient ]
def clientFor ( address : InetSocketAddress ) : RemoteClient = synchronized {
val hostname = address . getHostName
val port = address . getPort
val hash = hostname + ":" + port
if ( clients . contains ( hash ) ) clients ( hash )
else {
val client = new RemoteClient ( hostname , port )
client . connect
clients + hash -> client
client
}
}
}
2009-06-24 15:12:47 +02:00
2009-07-02 13:23:03 +02:00
class RemoteClient ( hostname : String , port : Int ) extends Logging {
2009-06-25 13:07:58 +02:00
@volatile private var isRunning = false
2009-06-24 15:12:47 +02:00
private val futures = new ConcurrentHashMap [ Long , CompletableFutureResult ]
2009-07-01 15:29:06 +02:00
private val supervisors = new ConcurrentHashMap [ String , Actor ]
2009-06-24 15:12:47 +02:00
2009-07-01 15:29:06 +02:00
// TODO is this Netty channelFactory and other options always the best or should it be configurable?
2009-06-24 15:12:47 +02:00
private val channelFactory = new NioClientSocketChannelFactory (
Executors . newCachedThreadPool ,
Executors . newCachedThreadPool )
private val bootstrap = new ClientBootstrap ( channelFactory )
2009-07-01 15:29:06 +02:00
private val handler = new ObjectClientHandler ( futures , supervisors )
2009-06-24 15:12:47 +02:00
bootstrap . getPipeline . addLast ( "handler" , handler )
bootstrap . setOption ( "tcpNoDelay" , true )
bootstrap . setOption ( "keepAlive" , true )
2009-06-25 13:07:58 +02:00
private var connection : ChannelFuture = _
2009-06-24 15:12:47 +02:00
2009-06-25 13:07:58 +02:00
def connect = synchronized {
if ( ! isRunning ) {
2009-07-02 13:23:03 +02:00
connection = bootstrap . connect ( new InetSocketAddress ( hostname , port ) )
log . info ( "Starting remote client connection to [%s:%s]" , hostname , port )
2009-06-25 13:07:58 +02:00
// Wait until the connection attempt succeeds or fails.
connection . awaitUninterruptibly
if ( ! connection . isSuccess ) {
2009-07-02 13:23:03 +02:00
log . error ( "Remote connection to [%s:%s] has failed due to [%s]" , hostname , port , connection . getCause )
2009-06-25 13:07:58 +02:00
connection . getCause . printStackTrace
}
isRunning = true
}
2009-06-24 15:12:47 +02:00
}
2009-06-25 13:07:58 +02:00
def shutdown = synchronized {
if ( ! isRunning ) {
connection . getChannel . getCloseFuture . awaitUninterruptibly
channelFactory . releaseExternalResources
}
2009-06-24 15:12:47 +02:00
}
2009-06-30 16:01:50 +02:00
def send ( request : RemoteRequest ) : Option [ CompletableFutureResult ] = if ( isRunning ) {
2009-06-24 15:12:47 +02:00
val escapedRequest = escapeRequest ( request )
if ( escapedRequest . isOneWay ) {
connection . getChannel . write ( escapedRequest )
2009-06-30 16:01:50 +02:00
None
2009-06-24 15:12:47 +02:00
} else {
futures . synchronized {
2009-07-01 15:29:06 +02:00
val futureResult = new DefaultCompletableFutureResult ( request . timeout )
2009-06-24 15:12:47 +02:00
futures . put ( request . id , futureResult )
connection . getChannel . write ( escapedRequest )
2009-06-30 16:01:50 +02:00
Some ( futureResult )
2009-06-24 15:12:47 +02:00
}
}
2009-07-01 15:29:06 +02:00
} else throw new IllegalStateException ( "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it." )
def registerSupervisorForActor ( actor : Actor ) =
if ( ! actor . supervisor . isDefined ) throw new IllegalStateException ( "Can't register supervisor for " + actor + " since it is not under supervision" )
else supervisors . putIfAbsent ( actor . supervisor . get . uuid , actor )
def deregisterSupervisorForActor ( actor : Actor ) =
if ( ! actor . supervisor . isDefined ) throw new IllegalStateException ( "Can't unregister supervisor for " + actor + " since it is not under supervision" )
else supervisors . remove ( actor . supervisor . get . uuid )
def deregisterSupervisorWithUuid ( uuid : String ) = supervisors . remove ( uuid )
2009-06-24 15:12:47 +02:00
private def escapeRequest ( request : RemoteRequest ) = {
if ( request . message . isInstanceOf [ Array [ Object ] ] ) {
val args = request . message . asInstanceOf [ Array [ Object ] ] . toList . asInstanceOf [ scala . List [ Object ] ]
2009-06-25 13:07:58 +02:00
var isEscaped = false
2009-06-24 15:12:47 +02:00
val escapedArgs = for ( arg <- args ) yield {
val clazz = arg . getClass
if ( clazz . getName . contains ( "$$ProxiedByAW" ) ) {
2009-06-25 13:07:58 +02:00
isEscaped = true
2009-06-24 15:12:47 +02:00
new ProxyWrapper ( clazz . getSuperclass . getName )
} else arg
}
2009-06-25 13:07:58 +02:00
request . cloneWithNewMessage ( escapedArgs , isEscaped )
2009-06-24 15:12:47 +02:00
} else request
}
}
@ChannelPipelineCoverage { val value = "all" }
2009-07-01 15:29:06 +02:00
class ObjectClientHandler ( val futures : ConcurrentMap [ Long , CompletableFutureResult ] ,
val supervisors : ConcurrentMap [ String , Actor ] )
extends SimpleChannelUpstreamHandler with Logging {
2009-06-24 15:12:47 +02:00
override def handleUpstream ( ctx : ChannelHandlerContext , event : ChannelEvent ) = {
if ( event . isInstanceOf [ ChannelStateEvent ] && event . asInstanceOf [ ChannelStateEvent ] . getState != ChannelState . INTEREST_OPS ) {
log . debug ( event . toString )
}
super . handleUpstream ( ctx , event )
}
override def channelOpen ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = {
// Add encoder and decoder as soon as a new channel is created so that
// a Java object is serialized and deserialized.
event . getChannel . getPipeline . addFirst ( "encoder" , new ObjectEncoder )
event . getChannel . getPipeline . addFirst ( "decoder" , new ObjectDecoder )
}
override def channelConnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) {
// Send the first message if this handler is a client-side handler.
// if (!firstMessage.isEmpty) e.getChannel.write(firstMessage)
}
override def messageReceived ( ctx : ChannelHandlerContext , event : MessageEvent ) {
try {
val result = event . getMessage
if ( result . isInstanceOf [ RemoteReply ] ) {
val reply = result . asInstanceOf [ RemoteReply ]
val future = futures . get ( reply . id )
2009-07-01 15:29:06 +02:00
//val tx = reply.tx
//if (reply.successful) future.completeWithResult((reply.message, tx))
2009-06-29 15:01:20 +02:00
if ( reply . successful ) future . completeWithResult ( reply . message )
2009-07-01 15:29:06 +02:00
else {
if ( reply . supervisorUuid . isDefined ) {
val supervisorUuid = reply . supervisorUuid . get
if ( ! supervisors . containsKey ( supervisorUuid ) ) throw new IllegalStateException ( "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found" )
val supervisedActor = supervisors . get ( supervisorUuid )
if ( ! supervisedActor . supervisor . isDefined ) throw new IllegalStateException ( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed" )
else supervisedActor . supervisor . get ! Exit ( supervisedActor , reply . exception )
}
future . completeWithException ( null , reply . exception )
}
futures . remove ( reply . id )
} else throw new IllegalArgumentException ( "Unknown message received in remote client handler: " + result )
2009-06-24 15:12:47 +02:00
} catch {
2009-07-01 15:29:06 +02:00
case e : Exception =>
log . error ( "Unexpected exception in remote client handler: %s" , e )
throw e
2009-06-24 15:12:47 +02:00
}
}
override def exceptionCaught ( ctx : ChannelHandlerContext , event : ExceptionEvent ) {
2009-07-01 15:29:06 +02:00
log . error ( "Unexpected exception from downstream in remote client: %s" , event . getCause )
2009-06-24 15:12:47 +02:00
event . getChannel . close
}
}