2009-06-24 15:12:47 +02:00
/* *
* Copyright ( C ) 2009 Scalable Solutions .
*/
package se.scalablesolutions.akka.kernel.nio
import java.net.InetSocketAddress
import java.util.concurrent. { Executors , ConcurrentMap , ConcurrentHashMap }
2009-07-23 20:01:37 +02:00
import protobuf.RemoteProtocol. { RemoteRequest , RemoteReply }
2009-07-01 15:29:06 +02:00
import kernel.actor. { Exit , Actor }
2009-06-30 16:01:50 +02:00
import kernel.reactor. { DefaultCompletableFutureResult , CompletableFutureResult }
2009-07-23 20:01:37 +02:00
import serialization. { Serializer , Serializable , SerializationProtocol }
import kernel.util.Logging
2009-07-18 00:16:32 +02:00
import org.jboss.netty.bootstrap.ClientBootstrap
2009-07-01 15:29:06 +02:00
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
2009-07-18 00:16:32 +02:00
import org.jboss.netty.handler.codec.frame. { LengthFieldBasedFrameDecoder , LengthFieldPrepender }
import org.jboss.netty.handler.codec.protobuf. { ProtobufDecoder , ProtobufEncoder }
2009-06-24 15:12:47 +02:00
2009-07-02 13:23:03 +02:00
import scala.collection.mutable.HashMap
2009-07-23 20:01:37 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-07-01 15:29:06 +02:00
object RemoteClient extends Logging {
2009-07-02 13:23:03 +02:00
private val clients = new HashMap [ String , RemoteClient ]
def clientFor ( address : InetSocketAddress ) : RemoteClient = synchronized {
val hostname = address . getHostName
val port = address . getPort
val hash = hostname + ":" + port
if ( clients . contains ( hash ) ) clients ( hash )
else {
val client = new RemoteClient ( hostname , port )
client . connect
2009-07-03 17:15:36 +02:00
clients += hash -> client
2009-07-02 13:23:03 +02:00
client
}
}
}
2009-06-24 15:12:47 +02:00
2009-07-23 20:01:37 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-07-02 13:23:03 +02:00
class RemoteClient ( hostname : String , port : Int ) extends Logging {
2009-06-25 13:07:58 +02:00
@volatile private var isRunning = false
2009-06-24 15:12:47 +02:00
private val futures = new ConcurrentHashMap [ Long , CompletableFutureResult ]
2009-07-01 15:29:06 +02:00
private val supervisors = new ConcurrentHashMap [ String , Actor ]
2009-06-24 15:12:47 +02:00
2009-07-01 15:29:06 +02:00
// TODO is this Netty channelFactory and other options always the best or should it be configurable?
2009-06-24 15:12:47 +02:00
private val channelFactory = new NioClientSocketChannelFactory (
Executors . newCachedThreadPool ,
Executors . newCachedThreadPool )
private val bootstrap = new ClientBootstrap ( channelFactory )
2009-07-18 00:16:32 +02:00
bootstrap . setPipelineFactory ( new RemoteClientPipelineFactory ( futures , supervisors ) )
2009-06-24 15:12:47 +02:00
bootstrap . setOption ( "tcpNoDelay" , true )
bootstrap . setOption ( "keepAlive" , true )
2009-06-25 13:07:58 +02:00
private var connection : ChannelFuture = _
2009-06-24 15:12:47 +02:00
2009-06-25 13:07:58 +02:00
def connect = synchronized {
if ( ! isRunning ) {
2009-07-02 13:23:03 +02:00
connection = bootstrap . connect ( new InetSocketAddress ( hostname , port ) )
log . info ( "Starting remote client connection to [%s:%s]" , hostname , port )
2009-06-25 13:07:58 +02:00
// Wait until the connection attempt succeeds or fails.
connection . awaitUninterruptibly
if ( ! connection . isSuccess ) {
2009-07-02 13:23:03 +02:00
log . error ( "Remote connection to [%s:%s] has failed due to [%s]" , hostname , port , connection . getCause )
2009-06-25 13:07:58 +02:00
connection . getCause . printStackTrace
}
isRunning = true
}
2009-06-24 15:12:47 +02:00
}
2009-06-25 13:07:58 +02:00
def shutdown = synchronized {
if ( ! isRunning ) {
connection . getChannel . getCloseFuture . awaitUninterruptibly
channelFactory . releaseExternalResources
}
2009-06-24 15:12:47 +02:00
}
2009-06-30 16:01:50 +02:00
def send ( request : RemoteRequest ) : Option [ CompletableFutureResult ] = if ( isRunning ) {
2009-07-18 00:16:32 +02:00
if ( request . getIsOneWay ) {
connection . getChannel . write ( request )
2009-06-30 16:01:50 +02:00
None
2009-06-24 15:12:47 +02:00
} else {
futures . synchronized {
2009-07-18 00:16:32 +02:00
val futureResult = new DefaultCompletableFutureResult ( request . getTimeout )
futures . put ( request . getId , futureResult )
connection . getChannel . write ( request )
2009-06-30 16:01:50 +02:00
Some ( futureResult )
2009-06-24 15:12:47 +02:00
}
}
2009-07-01 15:29:06 +02:00
} else throw new IllegalStateException ( "Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it." )
def registerSupervisorForActor ( actor : Actor ) =
if ( ! actor . supervisor . isDefined ) throw new IllegalStateException ( "Can't register supervisor for " + actor + " since it is not under supervision" )
else supervisors . putIfAbsent ( actor . supervisor . get . uuid , actor )
def deregisterSupervisorForActor ( actor : Actor ) =
if ( ! actor . supervisor . isDefined ) throw new IllegalStateException ( "Can't unregister supervisor for " + actor + " since it is not under supervision" )
else supervisors . remove ( actor . supervisor . get . uuid )
def deregisterSupervisorWithUuid ( uuid : String ) = supervisors . remove ( uuid )
2009-07-18 00:16:32 +02:00
}
2009-06-24 15:12:47 +02:00
2009-07-27 21:21:28 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-07-18 00:16:32 +02:00
class RemoteClientPipelineFactory ( futures : ConcurrentMap [ Long , CompletableFutureResult ] ,
supervisors : ConcurrentMap [ String , Actor ] ) extends ChannelPipelineFactory {
def getPipeline : ChannelPipeline = {
val p = Channels . pipeline ( )
p . addLast ( "frameDecoder" , new LengthFieldBasedFrameDecoder ( 1048576 , 0 , 4 , 0 , 4 ) ) ;
2009-07-23 20:01:37 +02:00
p . addLast ( "protobufDecoder" , new ProtobufDecoder ( RemoteReply . getDefaultInstance ) ) ;
2009-07-18 00:16:32 +02:00
p . addLast ( "frameEncoder" , new LengthFieldPrepender ( 4 ) ) ;
p . addLast ( "protobufEncoder" , new ProtobufEncoder ( ) ) ;
p . addLast ( "handler" , new RemoteClientHandler ( futures , supervisors ) )
p
2009-06-24 15:12:47 +02:00
}
}
2009-07-27 21:21:28 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-06-24 15:12:47 +02:00
@ChannelPipelineCoverage { val value = "all" }
2009-07-12 23:08:17 +02:00
class RemoteClientHandler ( val futures : ConcurrentMap [ Long , CompletableFutureResult ] ,
2009-07-01 15:29:06 +02:00
val supervisors : ConcurrentMap [ String , Actor ] )
extends SimpleChannelUpstreamHandler with Logging {
2009-06-24 15:12:47 +02:00
override def handleUpstream ( ctx : ChannelHandlerContext , event : ChannelEvent ) = {
if ( event . isInstanceOf [ ChannelStateEvent ] && event . asInstanceOf [ ChannelStateEvent ] . getState != ChannelState . INTEREST_OPS ) {
log . debug ( event . toString )
}
super . handleUpstream ( ctx , event )
}
override def messageReceived ( ctx : ChannelHandlerContext , event : MessageEvent ) {
try {
val result = event . getMessage
if ( result . isInstanceOf [ RemoteReply ] ) {
val reply = result . asInstanceOf [ RemoteReply ]
2009-07-18 00:16:32 +02:00
log . debug ( "Received RemoteReply[\n%s]" , reply . toString )
2009-07-13 00:17:57 +02:00
val future = futures . get ( reply . getId )
2009-07-18 00:16:32 +02:00
if ( reply . getIsSuccessful ) {
2009-07-23 20:01:37 +02:00
val message = RemoteProtocolBuilder . getMessage ( reply )
2009-07-18 00:16:32 +02:00
future . completeWithResult ( message )
} else {
if ( reply . hasSupervisorUuid ) {
val supervisorUuid = reply . getSupervisorUuid
2009-07-01 15:29:06 +02:00
if ( ! supervisors . containsKey ( supervisorUuid ) ) throw new IllegalStateException ( "Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found" )
val supervisedActor = supervisors . get ( supervisorUuid )
if ( ! supervisedActor . supervisor . isDefined ) throw new IllegalStateException ( "Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed" )
2009-07-23 20:01:37 +02:00
else supervisedActor . supervisor . get ! Exit ( supervisedActor , parseException ( reply ) )
2009-07-01 15:29:06 +02:00
}
2009-07-23 20:01:37 +02:00
future . completeWithException ( null , parseException ( reply ) )
2009-07-01 15:29:06 +02:00
}
2009-07-13 00:17:57 +02:00
futures . remove ( reply . getId )
2009-07-01 15:29:06 +02:00
} else throw new IllegalArgumentException ( "Unknown message received in remote client handler: " + result )
2009-06-24 15:12:47 +02:00
} catch {
2009-07-01 15:29:06 +02:00
case e : Exception =>
log . error ( "Unexpected exception in remote client handler: %s" , e )
throw e
2009-06-24 15:12:47 +02:00
}
}
override def exceptionCaught ( ctx : ChannelHandlerContext , event : ExceptionEvent ) {
2009-07-01 15:29:06 +02:00
log . error ( "Unexpected exception from downstream in remote client: %s" , event . getCause )
2009-07-18 00:16:32 +02:00
event . getCause . printStackTrace
2009-06-24 15:12:47 +02:00
event . getChannel . close
}
2009-07-23 20:01:37 +02:00
private def parseException ( reply : RemoteReply ) = {
val exception = reply . getException
val exceptionType = Class . forName ( exception . substring ( 0 , exception . indexOf ( '$' ) ) )
val exceptionMessage = exception . substring ( exception . indexOf ( '$' ) + 1 , exception . length )
exceptionType
. getConstructor ( Array [ Class [ _ ] ] ( classOf [ String ] ) : _ * )
. newInstance ( exceptionMessage ) . asInstanceOf [ Throwable ]
}
2009-06-24 15:12:47 +02:00
}