Merge with master

This commit is contained in:
Viktor Klang 2010-08-03 11:40:25 +02:00
commit 98d3034a9a
271 changed files with 9230 additions and 4508 deletions

View file

@ -13,7 +13,7 @@ import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.config.Config._
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
@ -57,7 +57,7 @@ object RemoteNode extends RemoteServer
/**
* For internal use only.
* Holds configuration variables, remote actors, remote active objects and remote servers.
* Holds configuration variables, remote actors, remote typed actors and remote servers.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -65,7 +65,7 @@ object RemoteServer {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT)
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
val ZLIB_COMPRESSION_LEVEL = {
@ -76,7 +76,8 @@ object RemoteServer {
}
val SECURE = {
if(config.getBool("akka.remote.ssl.service",false)){
//TODO: Remove this when SSL is in working condition
/*if(config.getBool("akka.remote.ssl.service",false)){
val properties = List(
("key-store-type" ,"keyStoreType"),
@ -97,7 +98,7 @@ object RemoteServer {
true
}
else
else */
false
}
@ -121,7 +122,7 @@ object RemoteServer {
private class RemoteActorSet {
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val activeObjects = new ConcurrentHashMap[String, AnyRef]
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
}
private val guard = new ReadWriteGuard
@ -132,8 +133,8 @@ object RemoteServer {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
}
private[akka] def registerActiveObject(address: InetSocketAddress, name: String, activeObject: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).activeObjects.put(name, activeObject)
private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor)
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
@ -225,12 +226,12 @@ class RemoteServer extends Logging {
RemoteServer.register(hostname, port, this)
val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port))
val pipelineFactory = new RemoteServerPipelineFactory(
name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects)
name, openChannels, loader, remoteActorSet.actors, remoteActorSet.typedActors)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
_isRunning = true
Cluster.registerLocalNode(hostname, port)
@ -243,15 +244,20 @@ class RemoteServer extends Logging {
def shutdown = synchronized {
if (_isRunning) {
RemoteServer.unregister(hostname, port)
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port)
try {
RemoteServer.unregister(hostname, port)
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port)
} catch {
case e: java.nio.channels.ClosedChannelException => {}
case e => log.warning("Could not close remote server channel in a graceful way")
}
}
}
// TODO: register active object in RemoteServer as well
// TODO: register typed actor in RemoteServer as well
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
@ -331,7 +337,7 @@ class RemoteServerPipelineFactory(
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val actors: JMap[String, ActorRef],
val activeObjects: JMap[String, AnyRef]) extends ChannelPipelineFactory {
val typedActors: JMap[String, AnyRef]) extends ChannelPipelineFactory {
import RemoteServer._
def getPipeline: ChannelPipeline = {
@ -351,7 +357,7 @@ class RemoteServerPipelineFactory(
case _ => (join(),join())
}
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, activeObjects)
val remoteServer = new RemoteServerHandler(name, openChannels, loader, actors, typedActors)
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
@ -368,7 +374,7 @@ class RemoteServerHandler(
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val actors: JMap[String, ActorRef],
val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
val typedActors: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
@ -422,139 +428,88 @@ class RemoteServerHandler(
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
if (request.getIsActor) dispatchToActor(request, channel)
else dispatchToActiveObject(request, channel)
val actorType = request.getActorInfo.getActorType
if (actorType == ActorType.SCALA_ACTOR) dispatchToActor(request, channel)
else if (actorType == ActorType.JAVA_ACTOR) throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
else if (actorType == ActorType.TYPED_ACTOR) dispatchToTypedActor(request, channel)
else throw new IllegalActorStateException("Unknown ActorType [" + actorType + "]")
}
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid)
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
val actorInfo = request.getActorInfo
log.debug("Dispatching to remote actor [%s:%s]", actorInfo.getTarget, actorInfo.getUuid)
val actorRef = createActor(actorInfo)
actorRef.start
val message = MessageSerializer.deserialize(request.getMessage)
val sender =
if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
try {
val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef]
val result = if (resultOrNone.isDefined) resultOrNone.get else null
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
channel.write(replyBuilder.build)
} catch {
case e: Throwable =>
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
.setIsSuccessful(false)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
case e: Throwable => channel.write(createErrorReplyMessage(e, request, true))
}
}
}
private def dispatchToActiveObject(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
val activeObject = createActiveObject(request.getTarget, request.getTimeout)
private def dispatchToTypedActor(request: RemoteRequestProtocol, channel: Channel) = {
val actorInfo = request.getActorInfo
val typedActorInfo = actorInfo.getTypedActorInfo
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
val typedActor = createTypedActor(actorInfo)
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
try {
val messageReceiver = activeObject.getClass.getDeclaredMethod(
request.getMethod, unescapedArgClasses: _*)
if (request.getIsOneWay) messageReceiver.invoke(activeObject, unescapedArgs: _*)
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
if (request.getIsOneWay) messageReceiver.invoke(typedActor, args: _*)
else {
val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
log.debug("Returning result from remote active object invocation [%s]", result)
val result = messageReceiver.invoke(typedActor, args: _*)
log.debug("Returning result from remote typed actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
channel.write(replyBuilder.build)
}
} catch {
case e: InvocationTargetException =>
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(ExceptionProtocol.newBuilder.setClassname(e.getCause.getClass.getName).setMessage(e.getCause.getMessage).build)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
case e: Throwable =>
log.error(e, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
case e: InvocationTargetException => channel.write(createErrorReplyMessage(e.getCause, request, false))
case e: Throwable => channel.write(createErrorReplyMessage(e, request, false))
}
}
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]], timeout: Long) = {
val unescapedArgs = new Array[AnyRef](args.size)
val unescapedArgClasses = new Array[Class[_]](args.size)
val escapedArgs = for (i <- 0 until args.size) {
val arg = args(i)
if (arg.isInstanceOf[String] && arg.asInstanceOf[String].startsWith(AW_PROXY_PREFIX)) {
val argString = arg.asInstanceOf[String]
val proxyName = argString.replace(AW_PROXY_PREFIX, "")
val activeObject = createActiveObject(proxyName, timeout)
unescapedArgs(i) = activeObject
unescapedArgClasses(i) = Class.forName(proxyName)
} else {
unescapedArgs(i) = args(i)
unescapedArgClasses(i) = argClasses(i)
}
}
(unescapedArgs, unescapedArgClasses)
}
private def createActiveObject(name: String, timeout: Long): AnyRef = {
val activeObjectOrNull = activeObjects.get(name)
if (activeObjectOrNull eq null) {
try {
log.info("Creating a new remote active object [%s]", name)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
val newInstance = ActiveObject.newInstance(clazz, timeout).asInstanceOf[AnyRef]
activeObjects.put(name, newInstance)
newInstance
} catch {
case e =>
log.error(e, "Could not create remote active object instance")
throw e
}
} else activeObjectOrNull
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
* If actor already created then just return it from the registry.
*
* Does not start the actor.
*/
private def createActor(name: String, uuid: String, timeout: Long): ActorRef = {
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
val name = actorInfo.getTarget
val uuid = actorInfo.getUuid
val timeout = actorInfo.getTimeout
val actorRefOrNull = actors.get(uuid)
if (actorRefOrNull eq null) {
try {
@ -574,4 +529,43 @@ class RemoteServerHandler(
}
} else actorRefOrNull
}
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
val uuid = actorInfo.getUuid
val typedActorOrNull = typedActors.get(uuid)
if (typedActorOrNull eq null) {
val typedActorInfo = actorInfo.getTypedActorInfo
val interfaceClassname = typedActorInfo.getInterface
val targetClassname = actorInfo.getTarget
try {
log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname)
val (interfaceClass, targetClass) =
if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname),
applicationLoader.get.loadClass(targetClassname))
else (Class.forName(interfaceClassname), Class.forName(targetClassname))
val newInstance = TypedActor.newInstance(
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
typedActors.put(uuid, newInstance)
newInstance
} catch {
case e => log.error(e, "Could not create remote typed actor instance"); throw e
}
} else typedActorOrNull
}
private def createErrorReplyMessage(e: Throwable, request: RemoteRequestProtocol, isActor: Boolean): RemoteReplyProtocol = {
val actorInfo = request.getActorInfo
log.error(e, "Could not invoke remote typed actor [%s :: %s]", actorInfo.getTypedActorInfo.getMethod, actorInfo.getTarget)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
.setIsSuccessful(false)
.setIsActor(isActor)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
replyBuilder.build
}
}