2009-06-24 15:12:47 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009 Scalable Solutions.
|
|
|
|
|
*/
|
|
|
|
|
|
2009-09-02 09:10:21 +02:00
|
|
|
package se.scalablesolutions.akka.nio
|
2009-06-24 15:12:47 +02:00
|
|
|
|
|
|
|
|
import java.lang.reflect.InvocationTargetException
|
|
|
|
|
import java.net.InetSocketAddress
|
|
|
|
|
import java.util.concurrent.{ConcurrentHashMap, Executors}
|
2009-07-02 13:23:03 +02:00
|
|
|
|
2009-10-06 00:07:27 +02:00
|
|
|
import se.scalablesolutions.akka.actor._
|
|
|
|
|
import se.scalablesolutions.akka.util._
|
|
|
|
|
import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
|
|
|
|
|
import se.scalablesolutions.akka.Config.config
|
2009-07-18 00:16:32 +02:00
|
|
|
|
2009-06-24 15:12:47 +02:00
|
|
|
import org.jboss.netty.bootstrap.ServerBootstrap
|
|
|
|
|
import org.jboss.netty.channel._
|
|
|
|
|
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
|
2009-07-12 23:08:17 +02:00
|
|
|
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
|
|
|
|
|
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
|
|
|
|
|
2009-07-23 20:01:37 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2009-07-01 15:29:06 +02:00
|
|
|
class RemoteServer extends Logging {
|
2009-08-11 12:16:50 +02:00
|
|
|
def start = RemoteServer.start(None)
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
|
|
|
|
|
2009-07-23 20:01:37 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2009-07-01 15:29:06 +02:00
|
|
|
object RemoteServer extends Logging {
|
2009-08-11 12:16:50 +02:00
|
|
|
val HOSTNAME = config.getString("akka.remote.hostname", "localhost")
|
|
|
|
|
val PORT = config.getInt("akka.remote.port", 9999)
|
|
|
|
|
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.connection-timeout", 1000)
|
|
|
|
|
|
|
|
|
|
val name = "RemoteServer@" + HOSTNAME
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-06-25 23:47:30 +02:00
|
|
|
@volatile private var isRunning = false
|
2009-08-11 12:16:50 +02:00
|
|
|
@volatile private var isConfigured = false
|
2009-06-25 23:47:30 +02:00
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
private val factory = new NioServerSocketChannelFactory(
|
2009-06-24 15:12:47 +02:00
|
|
|
Executors.newCachedThreadPool,
|
|
|
|
|
Executors.newCachedThreadPool)
|
|
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
private val activeObjectFactory = new ActiveObjectFactory
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
private val bootstrap = new ServerBootstrap(factory)
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-10-08 19:01:04 +02:00
|
|
|
def start: Unit = start(None)
|
|
|
|
|
def start(loader: Option[ClassLoader]): Unit = start(HOSTNAME, PORT)
|
|
|
|
|
def start(hostname: String, port: Int): Unit = start(hostname, port, None)
|
|
|
|
|
def start(hostname: String, port: Int, loader: Option[ClassLoader]): Unit = synchronized {
|
2009-06-25 23:47:30 +02:00
|
|
|
if (!isRunning) {
|
2009-07-01 15:29:06 +02:00
|
|
|
log.info("Starting remote server at [%s:%s]", HOSTNAME, PORT)
|
2009-08-11 12:16:50 +02:00
|
|
|
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, loader))
|
2009-10-08 19:01:04 +02:00
|
|
|
// FIXME make these RemoteServer options configurable
|
2009-08-11 12:16:50 +02:00
|
|
|
bootstrap.setOption("child.tcpNoDelay", true)
|
|
|
|
|
bootstrap.setOption("child.keepAlive", true)
|
|
|
|
|
bootstrap.setOption("child.reuseAddress", true)
|
|
|
|
|
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
|
2009-06-25 23:47:30 +02:00
|
|
|
bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT))
|
|
|
|
|
isRunning = true
|
|
|
|
|
}
|
|
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
|
2009-07-27 21:21:28 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2009-08-11 12:16:50 +02:00
|
|
|
class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) extends ChannelPipelineFactory {
|
2009-07-18 00:16:32 +02:00
|
|
|
def getPipeline: ChannelPipeline = {
|
|
|
|
|
val p = Channels.pipeline()
|
|
|
|
|
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
|
2009-10-06 00:07:27 +02:00
|
|
|
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteRequest.getDefaultInstance))
|
2009-07-18 00:16:32 +02:00
|
|
|
p.addLast("frameEncoder", new LengthFieldPrepender(4))
|
|
|
|
|
p.addLast("protobufEncoder", new ProtobufEncoder)
|
2009-08-11 12:16:50 +02:00
|
|
|
p.addLast("handler", new RemoteServerHandler(name, loader))
|
2009-07-18 00:16:32 +02:00
|
|
|
p
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-27 21:21:28 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2009-06-30 16:01:50 +02:00
|
|
|
@ChannelPipelineCoverage { val value = "all" }
|
2009-08-11 12:16:50 +02:00
|
|
|
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
|
2009-06-24 15:12:47 +02:00
|
|
|
private val activeObjectFactory = new ActiveObjectFactory
|
|
|
|
|
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
|
2009-06-25 23:47:30 +02:00
|
|
|
private val actors = new ConcurrentHashMap[String, Actor]
|
2009-07-18 00:16:32 +02:00
|
|
|
|
2009-06-24 15:12:47 +02:00
|
|
|
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
|
|
|
|
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
|
|
|
|
log.debug(event.toString)
|
|
|
|
|
}
|
|
|
|
|
super.handleUpstream(ctx, event)
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-03 17:15:36 +02:00
|
|
|
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
|
2009-06-24 15:12:47 +02:00
|
|
|
val message = event.getMessage
|
2009-07-01 15:29:06 +02:00
|
|
|
if (message == null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event)
|
2009-06-24 15:12:47 +02:00
|
|
|
if (message.isInstanceOf[RemoteRequest]) handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
2009-07-01 15:29:06 +02:00
|
|
|
log.error("Unexpected exception from remote downstream: %s", event.getCause)
|
2009-07-18 00:16:32 +02:00
|
|
|
event.getCause.printStackTrace
|
2009-06-24 15:12:47 +02:00
|
|
|
event.getChannel.close
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
|
2009-07-18 00:16:32 +02:00
|
|
|
log.debug("Received RemoteRequest[\n%s]", request.toString)
|
|
|
|
|
if (request.getIsActor) dispatchToActor(request, channel)
|
2009-07-01 15:29:06 +02:00
|
|
|
else dispatchToActiveObject(request, channel)
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
|
2009-06-25 23:47:30 +02:00
|
|
|
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
|
2009-07-18 00:16:32 +02:00
|
|
|
log.debug("Dispatching to remote actor [%s]", request.getTarget)
|
|
|
|
|
val actor = createActor(request.getTarget, request.getTimeout)
|
2009-06-25 23:47:30 +02:00
|
|
|
actor.start
|
2009-07-23 20:01:37 +02:00
|
|
|
val message = RemoteProtocolBuilder.getMessage(request)
|
2009-07-18 00:16:32 +02:00
|
|
|
if (request.getIsOneWay) actor ! message
|
2009-06-25 23:47:30 +02:00
|
|
|
else {
|
|
|
|
|
try {
|
2009-07-13 00:17:57 +02:00
|
|
|
val resultOrNone = actor !! message
|
2009-07-01 15:29:06 +02:00
|
|
|
val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null
|
2009-06-25 23:47:30 +02:00
|
|
|
log.debug("Returning result from actor invocation [%s]", result)
|
2009-07-18 00:16:32 +02:00
|
|
|
val replyBuilder = RemoteReply.newBuilder
|
|
|
|
|
.setId(request.getId)
|
|
|
|
|
.setIsSuccessful(true)
|
|
|
|
|
.setIsActor(true)
|
2009-07-23 20:01:37 +02:00
|
|
|
RemoteProtocolBuilder.setMessage(result, replyBuilder)
|
2009-07-18 00:16:32 +02:00
|
|
|
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
2009-08-11 12:16:50 +02:00
|
|
|
val replyMessage = replyBuilder.build
|
|
|
|
|
channel.write(replyMessage)
|
2009-06-25 23:47:30 +02:00
|
|
|
} catch {
|
2009-07-01 15:29:06 +02:00
|
|
|
case e: Throwable =>
|
2009-07-18 00:16:32 +02:00
|
|
|
log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e)
|
2009-07-01 15:29:06 +02:00
|
|
|
e.printStackTrace
|
2009-07-18 00:16:32 +02:00
|
|
|
val replyBuilder = RemoteReply.newBuilder
|
|
|
|
|
.setId(request.getId)
|
|
|
|
|
.setException(e.getClass.getName + "$" + e.getMessage)
|
|
|
|
|
.setIsSuccessful(false)
|
|
|
|
|
.setIsActor(true)
|
|
|
|
|
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
2009-08-11 12:16:50 +02:00
|
|
|
val replyMessage = replyBuilder.build
|
|
|
|
|
channel.write(replyMessage)
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = {
|
2009-07-18 00:16:32 +02:00
|
|
|
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
|
|
|
|
val activeObject = createActiveObject(request.getTarget, request.getTimeout)
|
2009-06-25 23:47:30 +02:00
|
|
|
|
2009-07-24 00:41:42 +02:00
|
|
|
val args = RemoteProtocolBuilder.getMessage(request).asInstanceOf[Array[AnyRef]].toList
|
2009-07-01 15:29:06 +02:00
|
|
|
val argClasses = args.map(_.getClass)
|
2009-07-18 00:16:32 +02:00
|
|
|
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
|
2009-06-25 23:47:30 +02:00
|
|
|
|
2009-07-06 23:45:15 +02:00
|
|
|
//continueTransaction(request)
|
2009-06-25 23:47:30 +02:00
|
|
|
try {
|
2009-07-18 00:16:32 +02:00
|
|
|
val messageReceiver = activeObject.getClass.getDeclaredMethod(request.getMethod, unescapedArgClasses: _*)
|
|
|
|
|
if (request.getIsOneWay) messageReceiver.invoke(activeObject, unescapedArgs: _*)
|
2009-06-25 23:47:30 +02:00
|
|
|
else {
|
2009-07-06 23:45:15 +02:00
|
|
|
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
|
2009-07-01 15:29:06 +02:00
|
|
|
log.debug("Returning result from remote active object invocation [%s]", result)
|
2009-07-18 00:16:32 +02:00
|
|
|
val replyBuilder = RemoteReply.newBuilder
|
|
|
|
|
.setId(request.getId)
|
|
|
|
|
.setIsSuccessful(true)
|
|
|
|
|
.setIsActor(false)
|
2009-07-23 20:01:37 +02:00
|
|
|
RemoteProtocolBuilder.setMessage(result, replyBuilder)
|
2009-07-18 00:16:32 +02:00
|
|
|
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
2009-08-11 12:16:50 +02:00
|
|
|
val replyMessage = replyBuilder.build
|
|
|
|
|
channel.write(replyMessage)
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
case e: InvocationTargetException =>
|
2009-07-18 00:16:32 +02:00
|
|
|
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e.getCause)
|
2009-06-25 23:47:30 +02:00
|
|
|
e.getCause.printStackTrace
|
2009-07-18 00:16:32 +02:00
|
|
|
val replyBuilder = RemoteReply.newBuilder
|
|
|
|
|
.setId(request.getId)
|
|
|
|
|
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
|
|
|
|
|
.setIsSuccessful(false)
|
|
|
|
|
.setIsActor(false)
|
2009-08-11 12:16:50 +02:00
|
|
|
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
|
|
|
|
val replyMessage = replyBuilder.build
|
|
|
|
|
channel.write(replyMessage)
|
2009-07-01 15:29:06 +02:00
|
|
|
case e: Throwable =>
|
2009-07-18 00:16:32 +02:00
|
|
|
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e)
|
2009-07-01 15:29:06 +02:00
|
|
|
e.printStackTrace
|
2009-07-18 00:16:32 +02:00
|
|
|
val replyBuilder = RemoteReply.newBuilder
|
|
|
|
|
.setId(request.getId)
|
|
|
|
|
.setException(e.getClass.getName + "$" + e.getMessage)
|
|
|
|
|
.setIsSuccessful(false)
|
|
|
|
|
.setIsActor(false)
|
2009-08-11 12:16:50 +02:00
|
|
|
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
|
|
|
|
val replyMessage = replyBuilder.build
|
|
|
|
|
channel.write(replyMessage)
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-12 23:08:17 +02:00
|
|
|
/*
|
2009-06-25 23:47:30 +02:00
|
|
|
private def continueTransaction(request: RemoteRequest) = {
|
|
|
|
|
val tx = request.tx
|
|
|
|
|
if (tx.isDefined) {
|
|
|
|
|
tx.get.reinit
|
2009-06-29 15:01:20 +02:00
|
|
|
TransactionManagement.threadBoundTx.set(tx)
|
2009-08-15 22:44:29 +02:00
|
|
|
setThreadLocalTransaction(tx.transaction)
|
|
|
|
|
} else {
|
|
|
|
|
TransactionManagement.threadBoundTx.set(None)
|
|
|
|
|
setThreadLocalTransaction(null)
|
|
|
|
|
}
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
2009-07-12 23:08:17 +02:00
|
|
|
*/
|
2009-07-01 15:29:06 +02:00
|
|
|
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
|
2009-06-24 15:12:47 +02:00
|
|
|
val unescapedArgs = new Array[AnyRef](args.size)
|
|
|
|
|
val unescapedArgClasses = new Array[Class[_]](args.size)
|
|
|
|
|
|
|
|
|
|
val escapedArgs = for (i <- 0 until args.size) {
|
2009-07-18 00:16:32 +02:00
|
|
|
val arg = args(i)
|
2009-07-24 00:41:42 +02:00
|
|
|
if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith("$$ProxiedByAW")) {
|
2009-07-18 00:16:32 +02:00
|
|
|
val argString = arg.asInstanceOf[String]
|
2009-07-24 00:41:42 +02:00
|
|
|
val proxyName = argString.replace("$$ProxiedByAW", "") //argString.substring(argString.indexOf("$$ProxiedByAW"), argString.length)
|
2009-07-01 15:29:06 +02:00
|
|
|
val activeObject = createActiveObject(proxyName, timeout)
|
2009-06-24 15:12:47 +02:00
|
|
|
unescapedArgs(i) = activeObject
|
|
|
|
|
unescapedArgClasses(i) = Class.forName(proxyName)
|
|
|
|
|
} else {
|
|
|
|
|
unescapedArgs(i) = args(i)
|
|
|
|
|
unescapedArgClasses(i) = argClasses(i)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
(unescapedArgs, unescapedArgClasses)
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-01 15:29:06 +02:00
|
|
|
private def createActiveObject(name: String, timeout: Long): AnyRef = {
|
2009-06-24 15:12:47 +02:00
|
|
|
val activeObjectOrNull = activeObjects.get(name)
|
|
|
|
|
if (activeObjectOrNull == null) {
|
2009-06-25 13:07:58 +02:00
|
|
|
try {
|
2009-10-06 00:07:27 +02:00
|
|
|
log.info("Creating a new remote active object [%s]", name)
|
2009-08-11 12:16:50 +02:00
|
|
|
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
|
|
|
|
else Class.forName(name)
|
2009-07-02 13:23:03 +02:00
|
|
|
val newInstance = activeObjectFactory.newInstance(clazz, timeout).asInstanceOf[AnyRef]
|
2009-06-25 13:07:58 +02:00
|
|
|
activeObjects.put(name, newInstance)
|
|
|
|
|
newInstance
|
|
|
|
|
} catch {
|
|
|
|
|
case e =>
|
|
|
|
|
log.debug("Could not create remote active object instance due to: %s", e)
|
|
|
|
|
e.printStackTrace
|
2009-06-25 23:47:30 +02:00
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
} else activeObjectOrNull
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-01 15:29:06 +02:00
|
|
|
private def createActor(name: String, timeout: Long): Actor = {
|
2009-06-25 23:47:30 +02:00
|
|
|
val actorOrNull = actors.get(name)
|
|
|
|
|
if (actorOrNull == null) {
|
|
|
|
|
try {
|
2009-10-06 00:07:27 +02:00
|
|
|
log.info("Creating a new remote actor [%s]", name)
|
2009-08-11 12:16:50 +02:00
|
|
|
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
|
|
|
|
else Class.forName(name)
|
2009-06-25 23:47:30 +02:00
|
|
|
val newInstance = clazz.newInstance.asInstanceOf[Actor]
|
2009-07-01 15:29:06 +02:00
|
|
|
newInstance.timeout = timeout
|
2009-06-25 23:47:30 +02:00
|
|
|
actors.put(name, newInstance)
|
|
|
|
|
newInstance
|
|
|
|
|
} catch {
|
|
|
|
|
case e =>
|
|
|
|
|
log.debug("Could not create remote actor instance due to: %s", e)
|
|
|
|
|
e.printStackTrace
|
|
|
|
|
throw e
|
2009-06-25 13:07:58 +02:00
|
|
|
}
|
2009-06-25 23:47:30 +02:00
|
|
|
} else actorOrNull
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
}
|