2009-06-24 15:12:47 +02:00
|
|
|
/**
|
2009-12-27 16:01:53 +01:00
|
|
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-06-24 15:12:47 +02:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.remote
|
2009-06-24 15:12:47 +02:00
|
|
|
|
|
|
|
|
import java.lang.reflect.InvocationTargetException
|
|
|
|
|
import java.net.InetSocketAddress
|
|
|
|
|
import java.util.concurrent.{ConcurrentHashMap, Executors}
|
2009-12-30 08:36:24 +01:00
|
|
|
import java.util.{Map => JMap}
|
2009-07-02 13:23:03 +02:00
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.actor.{
|
2010-09-30 16:10:39 +02:00
|
|
|
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry}
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.actor.Actor._
|
|
|
|
|
import akka.util._
|
|
|
|
|
import akka.remote.protocol.RemoteProtocol._
|
|
|
|
|
import akka.remote.protocol.RemoteProtocol.ActorType._
|
|
|
|
|
import akka.config.Config._
|
|
|
|
|
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
|
|
|
|
|
import akka.serialization.RemoteActorSerialization
|
|
|
|
|
import akka.serialization.RemoteActorSerialization._
|
2009-07-18 00:16:32 +02:00
|
|
|
|
2009-06-24 15:12:47 +02:00
|
|
|
import org.jboss.netty.bootstrap.ServerBootstrap
|
|
|
|
|
import org.jboss.netty.channel._
|
2009-12-18 21:26:03 +01:00
|
|
|
import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup}
|
2009-06-24 15:12:47 +02:00
|
|
|
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
|
2009-07-12 23:08:17 +02:00
|
|
|
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
|
|
|
|
|
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
2009-11-21 22:04:10 +01:00
|
|
|
import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
|
2010-04-25 20:32:52 +02:00
|
|
|
import org.jboss.netty.handler.ssl.SslHandler
|
|
|
|
|
|
2010-05-20 19:14:31 +02:00
|
|
|
import scala.collection.mutable.Map
|
2010-08-21 16:37:33 +02:00
|
|
|
import scala.reflect.BeanProperty
|
2010-05-20 19:14:31 +02:00
|
|
|
|
2009-07-23 20:01:37 +02:00
|
|
|
/**
|
2009-11-24 17:41:08 +01:00
|
|
|
* Use this object if you need a single remote server on a specific node.
|
|
|
|
|
*
|
|
|
|
|
* <pre>
|
2009-12-18 21:26:03 +01:00
|
|
|
* // takes hostname and port from 'akka.conf'
|
2009-11-25 12:42:50 +01:00
|
|
|
* RemoteNode.start
|
2009-11-24 17:41:08 +01:00
|
|
|
* </pre>
|
|
|
|
|
*
|
2009-12-18 21:26:03 +01:00
|
|
|
* <pre>
|
|
|
|
|
* RemoteNode.start(hostname, port)
|
|
|
|
|
* </pre>
|
2010-04-06 12:45:09 +02:00
|
|
|
*
|
2010-02-16 09:39:56 +01:00
|
|
|
* You can specify the class loader to use to load the remote actors.
|
|
|
|
|
* <pre>
|
|
|
|
|
* RemoteNode.start(hostname, port, classLoader)
|
|
|
|
|
* </pre>
|
2009-12-18 21:26:03 +01:00
|
|
|
*
|
2009-11-24 17:41:08 +01:00
|
|
|
* If you need to create more than one, then you can use the RemoteServer:
|
2009-12-18 21:26:03 +01:00
|
|
|
*
|
2009-11-24 17:41:08 +01:00
|
|
|
* <pre>
|
|
|
|
|
* val server = new RemoteServer
|
2009-12-18 21:26:03 +01:00
|
|
|
* server.start(hostname, port)
|
2009-11-24 17:41:08 +01:00
|
|
|
* </pre>
|
|
|
|
|
*
|
2009-07-23 20:01:37 +02:00
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2010-01-04 21:54:54 +01:00
|
|
|
object RemoteNode extends RemoteServer
|
2009-11-24 17:41:08 +01:00
|
|
|
|
|
|
|
|
/**
|
2010-01-05 08:57:12 +01:00
|
|
|
* For internal use only.
|
2010-07-26 18:47:25 +02:00
|
|
|
* Holds configuration variables, remote actors, remote typed actors and remote servers.
|
2009-12-18 21:26:03 +01:00
|
|
|
*
|
2009-11-24 17:41:08 +01:00
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2010-10-08 15:43:00 +02:00
|
|
|
object
|
|
|
|
|
RemoteServer {
|
2010-09-20 12:33:30 +02:00
|
|
|
val UUID_PREFIX = "uuid:"
|
2009-10-22 11:14:36 +02:00
|
|
|
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
2010-08-24 23:21:28 +02:00
|
|
|
val PORT = config.getInt("akka.remote.server.port", 9999)
|
2009-11-24 17:41:08 +01:00
|
|
|
|
2010-07-23 04:54:21 +02:00
|
|
|
val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT)
|
2009-11-24 17:41:08 +01:00
|
|
|
|
2009-11-22 14:32:27 +01:00
|
|
|
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
|
|
|
|
|
val ZLIB_COMPRESSION_LEVEL = {
|
|
|
|
|
val level = config.getInt("akka.remote.zlib-compression-level", 6)
|
|
|
|
|
if (level < 1 && level > 9) throw new IllegalArgumentException(
|
|
|
|
|
"zlib compression level has to be within 1-9, with 1 being fastest and 9 being the most compressed")
|
|
|
|
|
level
|
|
|
|
|
}
|
2009-12-30 08:36:24 +01:00
|
|
|
|
2010-07-15 21:33:44 +02:00
|
|
|
val SECURE = {
|
2010-08-18 19:02:48 +02:00
|
|
|
/*if (config.getBool("akka.remote.ssl.service",false)) {
|
2010-07-15 21:33:44 +02:00
|
|
|
val properties = List(
|
2010-08-11 01:15:01 +02:00
|
|
|
("key-store-type" , "keyStoreType"),
|
|
|
|
|
("key-store" , "keyStore"),
|
|
|
|
|
("key-store-pass" , "keyStorePassword"),
|
|
|
|
|
("trust-store-type", "trustStoreType"),
|
|
|
|
|
("trust-store" , "trustStore"),
|
|
|
|
|
("trust-store-pass", "trustStorePassword")
|
|
|
|
|
).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2))
|
|
|
|
|
|
|
|
|
|
// If property is not set, and we have a value from our akka.conf, use that value
|
2010-08-21 16:13:16 +02:00
|
|
|
for {
|
2010-08-11 01:15:01 +02:00
|
|
|
p <- properties if System.getProperty(p._2) eq null
|
|
|
|
|
c <- config.getString(p._1)
|
|
|
|
|
} System.setProperty(p._2, c)
|
2010-08-21 16:13:16 +02:00
|
|
|
|
2010-08-11 01:15:01 +02:00
|
|
|
if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl")
|
2010-07-15 21:33:44 +02:00
|
|
|
true
|
2010-08-18 19:02:48 +02:00
|
|
|
} else */false
|
2010-07-15 21:33:44 +02:00
|
|
|
}
|
|
|
|
|
|
2010-05-20 19:14:31 +02:00
|
|
|
private val guard = new ReadWriteGuard
|
|
|
|
|
private val remoteServers = Map[Address, RemoteServer]()
|
2010-04-06 12:45:09 +02:00
|
|
|
|
2010-05-20 19:14:31 +02:00
|
|
|
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
|
|
|
|
|
serverFor(address) match {
|
|
|
|
|
case Some(server) => server
|
|
|
|
|
case None => (new RemoteServer).start(address)
|
|
|
|
|
}
|
2010-01-04 21:54:54 +01:00
|
|
|
}
|
|
|
|
|
|
2010-05-16 10:59:06 +02:00
|
|
|
private[akka] def serverFor(address: InetSocketAddress): Option[RemoteServer] =
|
|
|
|
|
serverFor(address.getHostName, address.getPort)
|
|
|
|
|
|
2010-05-20 19:14:31 +02:00
|
|
|
private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = guard.withReadGuard {
|
|
|
|
|
remoteServers.get(Address(hostname, port))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[akka] def register(hostname: String, port: Int, server: RemoteServer) = guard.withWriteGuard {
|
2010-01-04 21:54:54 +01:00
|
|
|
remoteServers.put(Address(hostname, port), server)
|
2010-05-20 19:14:31 +02:00
|
|
|
}
|
2010-04-06 12:45:09 +02:00
|
|
|
|
2010-05-20 19:14:31 +02:00
|
|
|
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
|
2010-01-04 21:54:54 +01:00
|
|
|
remoteServers.remove(Address(hostname, port))
|
2010-05-20 19:14:31 +02:00
|
|
|
}
|
2010-09-28 13:33:07 +02:00
|
|
|
|
2009-11-24 17:41:08 +01:00
|
|
|
}
|
2009-11-22 14:32:27 +01:00
|
|
|
|
2010-08-18 09:52:11 +02:00
|
|
|
/**
|
|
|
|
|
* Life-cycle events for RemoteServer.
|
|
|
|
|
*/
|
|
|
|
|
sealed trait RemoteServerLifeCycleEvent
|
2010-08-21 16:37:33 +02:00
|
|
|
case class RemoteServerError(@BeanProperty val cause: Throwable, @BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
|
|
|
|
case class RemoteServerShutdown(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
|
|
|
|
case class RemoteServerStarted(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
|
|
|
|
case class RemoteServerClientConnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
|
|
|
|
case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
|
2010-08-18 09:52:11 +02:00
|
|
|
|
2009-11-24 17:41:08 +01:00
|
|
|
/**
|
|
|
|
|
* Use this class if you need a more than one remote server on a specific node.
|
|
|
|
|
*
|
|
|
|
|
* <pre>
|
|
|
|
|
* val server = new RemoteServer
|
|
|
|
|
* server.start
|
|
|
|
|
* </pre>
|
|
|
|
|
*
|
|
|
|
|
* If you need to create more than one, then you can use the RemoteServer:
|
|
|
|
|
*
|
|
|
|
|
* <pre>
|
2009-11-25 12:42:50 +01:00
|
|
|
* RemoteNode.start
|
2009-11-24 17:41:08 +01:00
|
|
|
* </pre>
|
|
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2010-08-18 09:52:11 +02:00
|
|
|
class RemoteServer extends Logging with ListenerManagement {
|
2010-09-20 12:33:30 +02:00
|
|
|
import RemoteServer._
|
2010-08-19 20:55:28 +02:00
|
|
|
def name = "RemoteServer@" + hostname + ":" + port
|
2009-08-11 12:16:50 +02:00
|
|
|
|
2010-09-28 13:33:07 +02:00
|
|
|
private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
|
2010-09-07 10:12:26 +02:00
|
|
|
|
|
|
|
|
def hostname = address.hostname
|
|
|
|
|
def port = address.port
|
2009-12-18 21:26:03 +01:00
|
|
|
|
2010-04-06 12:45:09 +02:00
|
|
|
@volatile private var _isRunning = false
|
2009-06-25 23:47:30 +02:00
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
private val factory = new NioServerSocketChannelFactory(
|
2009-06-24 15:12:47 +02:00
|
|
|
Executors.newCachedThreadPool,
|
|
|
|
|
Executors.newCachedThreadPool)
|
|
|
|
|
|
2009-06-25 13:07:58 +02:00
|
|
|
private val bootstrap = new ServerBootstrap(factory)
|
2009-06-24 15:12:47 +02:00
|
|
|
|
2009-12-18 21:26:03 +01:00
|
|
|
// group of open channels, used for clean-up
|
2009-12-30 08:36:24 +01:00
|
|
|
private val openChannels: ChannelGroup = new DefaultChannelGroup("akka-remote-server")
|
2009-12-18 21:26:03 +01:00
|
|
|
|
2010-04-06 12:45:09 +02:00
|
|
|
def isRunning = _isRunning
|
|
|
|
|
|
2010-05-16 10:59:06 +02:00
|
|
|
def start: RemoteServer =
|
|
|
|
|
start(hostname, port, None)
|
|
|
|
|
|
2010-05-25 15:14:53 +02:00
|
|
|
def start(loader: ClassLoader): RemoteServer =
|
|
|
|
|
start(hostname, port, Some(loader))
|
2010-05-16 10:59:06 +02:00
|
|
|
|
|
|
|
|
def start(address: InetSocketAddress): RemoteServer =
|
|
|
|
|
start(address.getHostName, address.getPort, None)
|
2009-10-13 11:18:21 +02:00
|
|
|
|
2010-05-25 15:14:53 +02:00
|
|
|
def start(address: InetSocketAddress, loader: ClassLoader): RemoteServer =
|
|
|
|
|
start(address.getHostName, address.getPort, Some(loader))
|
2009-10-13 11:18:21 +02:00
|
|
|
|
2010-05-16 10:59:06 +02:00
|
|
|
def start(_hostname: String, _port: Int): RemoteServer =
|
|
|
|
|
start(_hostname, _port, None)
|
2009-10-13 11:18:21 +02:00
|
|
|
|
2010-06-01 18:41:39 +02:00
|
|
|
private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
|
2010-05-25 15:14:53 +02:00
|
|
|
start(_hostname, _port, Some(loader))
|
|
|
|
|
|
|
|
|
|
private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
|
2009-12-27 06:35:25 +01:00
|
|
|
try {
|
2010-04-06 12:45:09 +02:00
|
|
|
if (!_isRunning) {
|
2010-09-28 13:33:07 +02:00
|
|
|
address = Address(_hostname,_port)
|
2009-12-27 06:35:25 +01:00
|
|
|
log.info("Starting remote server at [%s:%s]", hostname, port)
|
2010-01-04 21:54:54 +01:00
|
|
|
RemoteServer.register(hostname, port, this)
|
2010-05-05 22:45:19 +02:00
|
|
|
val pipelineFactory = new RemoteServerPipelineFactory(
|
2010-09-07 10:12:26 +02:00
|
|
|
name, openChannels, loader, this)
|
2009-12-30 08:36:24 +01:00
|
|
|
bootstrap.setPipelineFactory(pipelineFactory)
|
2009-12-27 06:35:25 +01:00
|
|
|
bootstrap.setOption("child.tcpNoDelay", true)
|
|
|
|
|
bootstrap.setOption("child.keepAlive", true)
|
|
|
|
|
bootstrap.setOption("child.reuseAddress", true)
|
2010-07-23 04:54:21 +02:00
|
|
|
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
|
2009-12-27 06:35:25 +01:00
|
|
|
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
|
2010-04-06 12:45:09 +02:00
|
|
|
_isRunning = true
|
2010-01-04 21:54:54 +01:00
|
|
|
Cluster.registerLocalNode(hostname, port)
|
2010-09-12 11:24:27 +02:00
|
|
|
notifyListeners(RemoteServerStarted(this))
|
2010-04-06 12:45:09 +02:00
|
|
|
}
|
2009-12-27 06:35:25 +01:00
|
|
|
} catch {
|
2010-08-18 09:52:11 +02:00
|
|
|
case e =>
|
|
|
|
|
log.error(e, "Could not start up remote server")
|
2010-09-12 11:24:27 +02:00
|
|
|
notifyListeners(RemoteServerError(e, this))
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
2010-05-16 10:59:06 +02:00
|
|
|
this
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
2009-11-24 17:41:08 +01:00
|
|
|
|
2010-04-06 12:45:09 +02:00
|
|
|
def shutdown = synchronized {
|
|
|
|
|
if (_isRunning) {
|
2010-07-14 14:38:56 +02:00
|
|
|
try {
|
|
|
|
|
RemoteServer.unregister(hostname, port)
|
|
|
|
|
openChannels.disconnect
|
|
|
|
|
openChannels.close.awaitUninterruptibly
|
|
|
|
|
bootstrap.releaseExternalResources
|
|
|
|
|
Cluster.deregisterLocalNode(hostname, port)
|
2010-09-12 11:24:27 +02:00
|
|
|
notifyListeners(RemoteServerShutdown(this))
|
2010-07-14 14:38:56 +02:00
|
|
|
} catch {
|
2010-07-26 12:19:17 +02:00
|
|
|
case e: java.nio.channels.ClosedChannelException => {}
|
2010-07-18 07:28:56 +02:00
|
|
|
case e => log.warning("Could not close remote server channel in a graceful way")
|
2010-07-14 14:38:56 +02:00
|
|
|
}
|
2010-04-06 12:45:09 +02:00
|
|
|
}
|
2009-11-24 17:41:08 +01:00
|
|
|
}
|
2010-02-16 15:39:09 +01:00
|
|
|
|
2010-09-09 10:42:03 +02:00
|
|
|
/**
|
|
|
|
|
* Register typed actor by interface name.
|
|
|
|
|
*/
|
|
|
|
|
def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
|
2010-02-17 15:32:17 +01:00
|
|
|
|
|
|
|
|
/**
|
2010-09-06 16:33:55 +02:00
|
|
|
* Register remote typed actor by a specific id.
|
|
|
|
|
* @param id custom actor id
|
|
|
|
|
* @param typedActor typed actor to register
|
2010-02-17 15:32:17 +01:00
|
|
|
*/
|
2010-09-06 16:33:55 +02:00
|
|
|
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
|
2010-09-20 12:33:30 +02:00
|
|
|
log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id)
|
|
|
|
|
if (id.startsWith(UUID_PREFIX)) {
|
|
|
|
|
registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid())
|
|
|
|
|
} else {
|
|
|
|
|
registerTypedActor(id, typedActor, typedActors())
|
2010-09-06 10:15:44 +02:00
|
|
|
}
|
|
|
|
|
}
|
2010-02-17 15:32:17 +01:00
|
|
|
|
|
|
|
|
/**
|
2010-09-07 11:02:12 +02:00
|
|
|
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
|
2010-02-17 15:32:17 +01:00
|
|
|
*/
|
2010-09-13 13:31:42 +02:00
|
|
|
def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef)
|
2010-02-17 15:32:17 +01:00
|
|
|
|
|
|
|
|
/**
|
2010-04-06 12:45:09 +02:00
|
|
|
* Register Remote Actor by a specific 'id' passed as argument.
|
2010-05-07 11:19:19 +02:00
|
|
|
* <p/>
|
2010-05-21 20:08:49 +02:00
|
|
|
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
|
2010-02-17 15:32:17 +01:00
|
|
|
*/
|
2010-08-09 19:35:07 +02:00
|
|
|
def register(id: String, actorRef: ActorRef): Unit = synchronized {
|
2010-09-20 12:33:30 +02:00
|
|
|
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
|
|
|
|
|
if (id.startsWith(UUID_PREFIX)) {
|
|
|
|
|
register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid())
|
|
|
|
|
} else {
|
|
|
|
|
register(id, actorRef, actors())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2010-09-22 11:37:23 +02:00
|
|
|
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
|
2010-04-06 12:45:09 +02:00
|
|
|
if (_isRunning) {
|
2010-09-20 12:33:30 +02:00
|
|
|
if (!registry.contains(id)) {
|
2010-08-16 08:24:26 +02:00
|
|
|
if (!actorRef.isRunning) actorRef.start
|
2010-09-20 12:33:30 +02:00
|
|
|
registry.put(id, actorRef)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2010-09-22 11:37:23 +02:00
|
|
|
private def registerTypedActor[Key](id: Key, typedActor: AnyRef, registry: ConcurrentHashMap[Key, AnyRef]) {
|
2010-09-20 12:33:30 +02:00
|
|
|
if (_isRunning) {
|
|
|
|
|
if (!registry.contains(id)) {
|
|
|
|
|
registry.put(id, typedActor)
|
2010-08-16 08:24:26 +02:00
|
|
|
}
|
2010-04-06 12:45:09 +02:00
|
|
|
}
|
2010-02-17 15:32:17 +01:00
|
|
|
}
|
2010-05-05 22:45:19 +02:00
|
|
|
|
|
|
|
|
/**
|
2010-09-07 12:54:10 +02:00
|
|
|
* Unregister Remote Actor that is registered using its 'id' field (not custom ID).
|
2010-05-05 22:45:19 +02:00
|
|
|
*/
|
2010-08-09 19:35:07 +02:00
|
|
|
def unregister(actorRef: ActorRef):Unit = synchronized {
|
2010-05-05 22:45:19 +02:00
|
|
|
if (_isRunning) {
|
2010-09-07 10:12:26 +02:00
|
|
|
log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid)
|
2010-09-23 14:50:11 +02:00
|
|
|
actors().remove(actorRef.id,actorRef)
|
|
|
|
|
actorsByUuid().remove(actorRef.uuid,actorRef)
|
2010-05-07 11:19:19 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2010-05-21 20:08:49 +02:00
|
|
|
* Unregister Remote Actor by specific 'id'.
|
2010-05-07 11:19:19 +02:00
|
|
|
* <p/>
|
2010-05-21 20:08:49 +02:00
|
|
|
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
2010-05-07 11:19:19 +02:00
|
|
|
*/
|
2010-08-09 19:35:07 +02:00
|
|
|
def unregister(id: String):Unit = synchronized {
|
2010-05-07 11:19:19 +02:00
|
|
|
if (_isRunning) {
|
|
|
|
|
log.info("Unregistering server side remote actor with id [%s]", id)
|
2010-09-20 12:33:30 +02:00
|
|
|
if (id.startsWith(UUID_PREFIX)) {
|
|
|
|
|
actorsByUuid().remove(id.substring(UUID_PREFIX.length))
|
|
|
|
|
} else {
|
2010-09-23 14:50:11 +02:00
|
|
|
val actorRef = actors() get id
|
|
|
|
|
actorsByUuid().remove(actorRef.uuid,actorRef)
|
|
|
|
|
actors().remove(id,actorRef)
|
2010-09-20 12:33:30 +02:00
|
|
|
}
|
2010-05-05 22:45:19 +02:00
|
|
|
}
|
|
|
|
|
}
|
2010-08-18 09:52:11 +02:00
|
|
|
|
2010-09-06 16:33:55 +02:00
|
|
|
/**
|
|
|
|
|
* Unregister Remote Typed Actor by specific 'id'.
|
|
|
|
|
* <p/>
|
|
|
|
|
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
|
|
|
|
*/
|
|
|
|
|
def unregisterTypedActor(id: String):Unit = synchronized {
|
|
|
|
|
if (_isRunning) {
|
|
|
|
|
log.info("Unregistering server side remote typed actor with id [%s]", id)
|
2010-09-20 12:33:30 +02:00
|
|
|
if (id.startsWith(UUID_PREFIX)) {
|
|
|
|
|
typedActorsByUuid().remove(id.substring(UUID_PREFIX.length))
|
|
|
|
|
} else {
|
|
|
|
|
typedActors().remove(id)
|
|
|
|
|
}
|
2010-09-06 16:33:55 +02:00
|
|
|
}
|
|
|
|
|
}
|
2010-09-06 10:15:44 +02:00
|
|
|
|
2010-08-19 12:46:17 +02:00
|
|
|
protected override def manageLifeCycleOfListeners = false
|
|
|
|
|
|
2010-09-12 11:24:27 +02:00
|
|
|
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
|
2010-09-06 09:35:02 +02:00
|
|
|
|
2010-09-28 13:33:07 +02:00
|
|
|
private[akka] def actors() = ActorRegistry.actors(address)
|
|
|
|
|
private[akka] def actorsByUuid() = ActorRegistry.actorsByUuid(address)
|
|
|
|
|
private[akka] def typedActors() = ActorRegistry.typedActors(address)
|
|
|
|
|
private[akka] def typedActorsByUuid() = ActorRegistry.typedActorsByUuid(address)
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
|
2010-04-25 20:32:52 +02:00
|
|
|
object RemoteServerSslContext {
|
2010-07-15 21:33:44 +02:00
|
|
|
import javax.net.ssl.SSLContext
|
2010-04-25 20:32:52 +02:00
|
|
|
|
2010-08-11 01:15:01 +02:00
|
|
|
val (client, server) = {
|
2010-04-25 20:32:52 +02:00
|
|
|
val protocol = "TLS"
|
2010-07-15 21:33:44 +02:00
|
|
|
//val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509")
|
|
|
|
|
//val store = KeyStore.getInstance("JKS")
|
2010-04-25 20:32:52 +02:00
|
|
|
val s = SSLContext.getInstance(protocol)
|
2010-08-12 15:36:05 +02:00
|
|
|
s.init(null, null, null)
|
2010-04-25 20:32:52 +02:00
|
|
|
val c = SSLContext.getInstance(protocol)
|
2010-08-12 15:36:05 +02:00
|
|
|
c.init(null, null, null)
|
|
|
|
|
(c, s)
|
2010-04-25 20:32:52 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-27 21:21:28 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2009-12-27 22:56:55 +01:00
|
|
|
class RemoteServerPipelineFactory(
|
2009-12-30 08:36:24 +01:00
|
|
|
val name: String,
|
|
|
|
|
val openChannels: ChannelGroup,
|
|
|
|
|
val loader: Option[ClassLoader],
|
2010-08-18 09:52:11 +02:00
|
|
|
val server: RemoteServer) extends ChannelPipelineFactory {
|
2009-11-24 17:41:08 +01:00
|
|
|
import RemoteServer._
|
|
|
|
|
|
2009-12-18 21:26:03 +01:00
|
|
|
def getPipeline: ChannelPipeline = {
|
2010-07-15 21:33:44 +02:00
|
|
|
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
|
|
|
|
|
|
2010-08-20 11:54:57 +02:00
|
|
|
lazy val engine = {
|
|
|
|
|
val e = RemoteServerSslContext.server.createSSLEngine()
|
|
|
|
|
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
|
|
|
|
|
e.setUseClientMode(false)
|
|
|
|
|
e
|
|
|
|
|
}
|
2010-04-25 20:32:52 +02:00
|
|
|
|
2010-08-11 01:15:01 +02:00
|
|
|
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
2010-07-15 21:33:44 +02:00
|
|
|
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
|
|
|
|
val lenPrep = new LengthFieldPrepender(4)
|
|
|
|
|
val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance)
|
|
|
|
|
val protobufEnc = new ProtobufEncoder
|
2010-08-11 01:15:01 +02:00
|
|
|
val (enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
|
|
|
|
|
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
|
|
|
|
case _ => (join(), join())
|
2009-11-22 14:32:27 +01:00
|
|
|
}
|
2010-07-15 21:33:44 +02:00
|
|
|
|
2010-09-07 10:12:26 +02:00
|
|
|
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
|
2010-07-15 21:33:44 +02:00
|
|
|
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
|
2009-12-30 08:48:22 +01:00
|
|
|
new StaticChannelPipeline(stages: _*)
|
2009-07-18 00:16:32 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-27 21:21:28 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2010-03-29 09:33:32 +02:00
|
|
|
@ChannelHandler.Sharable
|
2009-12-27 22:56:55 +01:00
|
|
|
class RemoteServerHandler(
|
|
|
|
|
val name: String,
|
2009-12-30 08:36:24 +01:00
|
|
|
val openChannels: ChannelGroup,
|
|
|
|
|
val applicationLoader: Option[ClassLoader],
|
2010-08-18 09:52:11 +02:00
|
|
|
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
|
2010-09-20 12:33:30 +02:00
|
|
|
import RemoteServer._
|
2009-11-22 15:25:16 +01:00
|
|
|
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
|
2009-12-18 21:26:03 +01:00
|
|
|
|
2010-06-24 08:48:48 +02:00
|
|
|
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
|
2009-12-27 22:56:55 +01:00
|
|
|
|
2009-12-18 21:26:03 +01:00
|
|
|
/**
|
2010-09-13 11:08:43 +02:00
|
|
|
* ChannelOpen overridden to store open channels for a clean postStop of a RemoteServer.
|
2010-06-15 13:15:00 +02:00
|
|
|
* If a channel is closed before, it is automatically removed from the open channels group.
|
2009-12-18 21:26:03 +01:00
|
|
|
*/
|
2010-08-21 16:37:33 +02:00
|
|
|
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)
|
2010-08-10 21:42:27 +02:00
|
|
|
|
2010-08-21 16:37:33 +02:00
|
|
|
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
|
|
|
|
log.debug("Remote client connected to [%s]", server.name)
|
2010-08-11 01:15:01 +02:00
|
|
|
if (RemoteServer.SECURE) {
|
2010-08-21 16:37:33 +02:00
|
|
|
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
|
2010-08-10 21:42:27 +02:00
|
|
|
|
2010-07-15 21:33:44 +02:00
|
|
|
// Begin handshake.
|
2010-08-21 16:37:33 +02:00
|
|
|
sslHandler.handshake().addListener(new ChannelFutureListener {
|
2010-08-11 01:15:01 +02:00
|
|
|
def operationComplete(future: ChannelFuture): Unit = {
|
2010-08-21 16:37:33 +02:00
|
|
|
if (future.isSuccess) {
|
|
|
|
|
openChannels.add(future.getChannel)
|
2010-09-12 11:24:27 +02:00
|
|
|
server.notifyListeners(RemoteServerClientConnected(server))
|
2010-08-21 16:37:33 +02:00
|
|
|
} else future.getChannel.close
|
2010-07-15 21:33:44 +02:00
|
|
|
}
|
|
|
|
|
})
|
2010-08-26 08:50:31 +02:00
|
|
|
} else {
|
2010-09-12 11:24:27 +02:00
|
|
|
server.notifyListeners(RemoteServerClientConnected(server))
|
2010-07-15 21:33:44 +02:00
|
|
|
}
|
2010-04-25 20:32:52 +02:00
|
|
|
}
|
|
|
|
|
|
2010-08-21 16:37:33 +02:00
|
|
|
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
|
|
|
|
log.debug("Remote client disconnected from [%s]", server.name)
|
2010-09-12 11:24:27 +02:00
|
|
|
server.notifyListeners(RemoteServerClientDisconnected(server))
|
2010-08-21 16:37:33 +02:00
|
|
|
}
|
|
|
|
|
|
2009-06-24 15:12:47 +02:00
|
|
|
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
2009-11-21 20:51:03 +01:00
|
|
|
if (event.isInstanceOf[ChannelStateEvent] &&
|
|
|
|
|
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
2009-06-24 15:12:47 +02:00
|
|
|
log.debug(event.toString)
|
|
|
|
|
}
|
|
|
|
|
super.handleUpstream(ctx, event)
|
|
|
|
|
}
|
|
|
|
|
|
2009-07-03 17:15:36 +02:00
|
|
|
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
|
2009-06-24 15:12:47 +02:00
|
|
|
val message = event.getMessage
|
2010-07-02 11:14:49 +02:00
|
|
|
if (message eq null) throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
|
2010-05-07 11:19:19 +02:00
|
|
|
if (message.isInstanceOf[RemoteRequestProtocol]) {
|
|
|
|
|
handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel)
|
2009-11-21 20:51:03 +01:00
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
2009-11-22 15:25:16 +01:00
|
|
|
log.error(event.getCause, "Unexpected exception from remote downstream")
|
2009-06-24 15:12:47 +02:00
|
|
|
event.getChannel.close
|
2010-09-12 11:24:27 +02:00
|
|
|
server.notifyListeners(RemoteServerError(event.getCause, server))
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
|
2010-05-07 11:19:19 +02:00
|
|
|
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
|
|
|
|
|
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
|
2010-09-20 17:15:54 +02:00
|
|
|
request.getActorInfo.getActorType match {
|
|
|
|
|
case SCALA_ACTOR => dispatchToActor(request, channel)
|
|
|
|
|
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
|
|
|
|
|
case JAVA_ACTOR => throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
|
|
|
|
|
case other => throw new IllegalActorStateException("Unknown ActorType [" + other + "]")
|
|
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
|
|
|
|
|
2010-05-07 11:19:19 +02:00
|
|
|
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {
|
2010-07-26 18:47:25 +02:00
|
|
|
val actorInfo = request.getActorInfo
|
|
|
|
|
log.debug("Dispatching to remote actor [%s:%s]", actorInfo.getTarget, actorInfo.getUuid)
|
|
|
|
|
|
2010-08-18 09:52:11 +02:00
|
|
|
val actorRef = createActor(actorInfo).start
|
2010-07-26 18:47:25 +02:00
|
|
|
|
2010-06-24 08:48:48 +02:00
|
|
|
val message = MessageSerializer.deserialize(request.getMessage)
|
2010-06-30 16:26:15 +02:00
|
|
|
val sender =
|
2010-07-02 00:16:11 +05:30
|
|
|
if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
|
2010-06-15 13:15:00 +02:00
|
|
|
else None
|
2010-07-26 18:47:25 +02:00
|
|
|
|
2010-08-12 15:36:05 +02:00
|
|
|
message match { // first match on system messages
|
|
|
|
|
case RemoteActorSystemMessage.Stop => actorRef.stop
|
|
|
|
|
case _ => // then match on user defined messages
|
|
|
|
|
if (request.getIsOneWay) actorRef.!(message)(sender)
|
2010-09-16 15:58:46 +02:00
|
|
|
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some(
|
|
|
|
|
new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
|
|
|
|
|
override def onComplete(result: AnyRef) {
|
|
|
|
|
log.debug("Returning result from actor invocation [%s]", result)
|
|
|
|
|
val replyBuilder = RemoteReplyProtocol.newBuilder
|
2010-09-21 16:00:47 +02:00
|
|
|
.setUuid(request.getUuid)
|
|
|
|
|
.setMessage(MessageSerializer.serialize(result))
|
|
|
|
|
.setIsSuccessful(true)
|
|
|
|
|
.setIsActor(true)
|
2010-09-16 15:58:46 +02:00
|
|
|
|
|
|
|
|
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
channel.write(replyBuilder.build)
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Throwable =>
|
|
|
|
|
server.notifyListeners(RemoteServerError(e, server))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onCompleteException(exception: Throwable) {
|
|
|
|
|
try {
|
|
|
|
|
channel.write(createErrorReplyMessage(exception, request, true))
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Throwable =>
|
|
|
|
|
server.notifyListeners(RemoteServerError(e, server))
|
|
|
|
|
}
|
|
|
|
|
}
|
2010-08-12 15:36:05 +02:00
|
|
|
}
|
2010-09-16 15:58:46 +02:00
|
|
|
))
|
2009-12-18 21:26:03 +01:00
|
|
|
}
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
|
|
|
|
|
2010-07-26 18:47:25 +02:00
|
|
|
private def dispatchToTypedActor(request: RemoteRequestProtocol, channel: Channel) = {
|
|
|
|
|
val actorInfo = request.getActorInfo
|
|
|
|
|
val typedActorInfo = actorInfo.getTypedActorInfo
|
|
|
|
|
log.debug("Dispatching to remote typed actor [%s :: %s]", typedActorInfo.getMethod, typedActorInfo.getInterface)
|
2010-07-31 00:41:43 +02:00
|
|
|
val typedActor = createTypedActor(actorInfo)
|
2009-06-25 23:47:30 +02:00
|
|
|
|
2010-06-24 08:48:48 +02:00
|
|
|
val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
|
2009-07-01 15:29:06 +02:00
|
|
|
val argClasses = args.map(_.getClass)
|
2009-06-25 23:47:30 +02:00
|
|
|
|
|
|
|
|
try {
|
2010-07-31 00:41:43 +02:00
|
|
|
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
|
|
|
|
|
if (request.getIsOneWay) messageReceiver.invoke(typedActor, args: _*)
|
2009-06-25 23:47:30 +02:00
|
|
|
else {
|
2010-07-31 00:41:43 +02:00
|
|
|
val result = messageReceiver.invoke(typedActor, args: _*)
|
2010-07-26 18:47:25 +02:00
|
|
|
log.debug("Returning result from remote typed actor invocation [%s]", result)
|
2010-05-07 11:19:19 +02:00
|
|
|
val replyBuilder = RemoteReplyProtocol.newBuilder
|
2010-09-17 16:04:25 +02:00
|
|
|
.setUuid(request.getUuid)
|
2010-06-24 08:48:48 +02:00
|
|
|
.setMessage(MessageSerializer.serialize(result))
|
2009-12-18 21:26:03 +01:00
|
|
|
.setIsSuccessful(true)
|
|
|
|
|
.setIsActor(false)
|
2009-07-18 00:16:32 +02:00
|
|
|
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
2010-07-26 18:47:25 +02:00
|
|
|
channel.write(replyBuilder.build)
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
|
|
|
|
} catch {
|
2010-08-18 09:52:11 +02:00
|
|
|
case e: InvocationTargetException =>
|
|
|
|
|
channel.write(createErrorReplyMessage(e.getCause, request, false))
|
2010-09-12 11:24:27 +02:00
|
|
|
server.notifyListeners(RemoteServerError(e, server))
|
2010-08-18 09:52:11 +02:00
|
|
|
case e: Throwable =>
|
|
|
|
|
channel.write(createErrorReplyMessage(e, request, false))
|
2010-09-12 11:24:27 +02:00
|
|
|
server.notifyListeners(RemoteServerError(e, server))
|
2009-06-25 23:47:30 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2010-09-20 12:33:30 +02:00
|
|
|
private def findActorById(id: String) : ActorRef = {
|
|
|
|
|
server.actors().get(id)
|
2010-09-13 13:31:42 +02:00
|
|
|
}
|
|
|
|
|
|
2010-09-20 12:33:30 +02:00
|
|
|
private def findActorByUuid(uuid: String) : ActorRef = {
|
|
|
|
|
server.actorsByUuid().get(uuid)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def findTypedActorById(id: String) : AnyRef = {
|
|
|
|
|
server.typedActors().get(id)
|
2010-09-13 13:31:42 +02:00
|
|
|
}
|
|
|
|
|
|
2010-09-20 12:33:30 +02:00
|
|
|
private def findTypedActorByUuid(uuid: String) : AnyRef = {
|
|
|
|
|
server.typedActorsByUuid().get(uuid)
|
2010-09-13 13:31:42 +02:00
|
|
|
}
|
|
|
|
|
|
2010-10-04 14:13:49 +02:00
|
|
|
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
|
|
|
|
|
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
|
|
|
|
|
findActorByUuid(id.substring(UUID_PREFIX.length))
|
|
|
|
|
} else {
|
|
|
|
|
findActorById(id)
|
|
|
|
|
}
|
|
|
|
|
if (actorRefOrNull eq null) {
|
|
|
|
|
actorRefOrNull = findActorByUuid(uuid)
|
|
|
|
|
}
|
|
|
|
|
actorRefOrNull
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def findTypedActorByIdOrUuid(id: String, uuid: String) : AnyRef = {
|
|
|
|
|
var actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
|
|
|
|
|
findTypedActorByUuid(id.substring(UUID_PREFIX.length))
|
|
|
|
|
} else {
|
|
|
|
|
findTypedActorById(id)
|
|
|
|
|
}
|
|
|
|
|
if (actorRefOrNull eq null) {
|
|
|
|
|
actorRefOrNull = findTypedActorByUuid(uuid)
|
|
|
|
|
}
|
|
|
|
|
actorRefOrNull
|
|
|
|
|
}
|
2010-09-20 12:33:30 +02:00
|
|
|
|
2010-04-06 12:45:09 +02:00
|
|
|
/**
|
|
|
|
|
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
|
2010-07-29 17:29:51 +02:00
|
|
|
*
|
2010-04-06 12:45:09 +02:00
|
|
|
* If actor already created then just return it from the registry.
|
2010-07-29 17:29:51 +02:00
|
|
|
*
|
2010-04-06 12:45:09 +02:00
|
|
|
* Does not start the actor.
|
|
|
|
|
*/
|
2010-07-26 18:47:25 +02:00
|
|
|
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
|
2010-09-16 13:50:57 +02:00
|
|
|
val uuid = actorInfo.getUuid
|
|
|
|
|
val id = actorInfo.getId
|
2010-09-13 13:31:42 +02:00
|
|
|
|
2010-08-11 01:15:01 +02:00
|
|
|
val name = actorInfo.getTarget
|
2010-07-26 18:47:25 +02:00
|
|
|
val timeout = actorInfo.getTimeout
|
|
|
|
|
|
2010-10-04 14:13:49 +02:00
|
|
|
val actorRefOrNull = findActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
|
2010-09-20 12:33:30 +02:00
|
|
|
|
2010-05-07 11:19:19 +02:00
|
|
|
if (actorRefOrNull eq null) {
|
2009-06-25 23:47:30 +02:00
|
|
|
try {
|
2009-11-21 20:51:03 +01:00
|
|
|
log.info("Creating a new remote actor [%s:%s]", name, uuid)
|
2009-08-11 12:16:50 +02:00
|
|
|
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
2010-05-16 20:15:08 +02:00
|
|
|
else Class.forName(name)
|
2010-10-25 18:20:29 +02:00
|
|
|
val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
|
2010-09-21 16:00:47 +02:00
|
|
|
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
|
2010-09-13 13:31:42 +02:00
|
|
|
actorRef.id = id
|
2010-05-08 10:04:13 +02:00
|
|
|
actorRef.timeout = timeout
|
2010-05-13 15:40:49 +02:00
|
|
|
actorRef.remoteAddress = None
|
2010-10-04 14:13:49 +02:00
|
|
|
server.actorsByUuid.put(actorRef.uuid.toString, actorRef) // register by uuid
|
2010-05-07 11:19:19 +02:00
|
|
|
actorRef
|
2009-06-25 23:47:30 +02:00
|
|
|
} catch {
|
|
|
|
|
case e =>
|
2010-01-05 08:57:12 +01:00
|
|
|
log.error(e, "Could not create remote actor instance")
|
2010-09-12 11:24:27 +02:00
|
|
|
server.notifyListeners(RemoteServerError(e, server))
|
2009-06-25 23:47:30 +02:00
|
|
|
throw e
|
2009-06-25 13:07:58 +02:00
|
|
|
}
|
2010-05-07 11:19:19 +02:00
|
|
|
} else actorRefOrNull
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|
2010-07-26 18:47:25 +02:00
|
|
|
|
|
|
|
|
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
|
2010-09-16 13:50:57 +02:00
|
|
|
val uuid = actorInfo.getUuid
|
|
|
|
|
val id = actorInfo.getId
|
2010-09-13 13:31:42 +02:00
|
|
|
|
2010-10-04 14:13:49 +02:00
|
|
|
val typedActorOrNull = findTypedActorByIdOrUuid(id, uuidFrom(uuid.getHigh,uuid.getLow).toString)
|
2010-07-26 18:47:25 +02:00
|
|
|
|
2010-07-31 00:41:43 +02:00
|
|
|
if (typedActorOrNull eq null) {
|
2010-07-26 18:47:25 +02:00
|
|
|
val typedActorInfo = actorInfo.getTypedActorInfo
|
|
|
|
|
val interfaceClassname = typedActorInfo.getInterface
|
|
|
|
|
val targetClassname = actorInfo.getTarget
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
log.info("Creating a new remote typed actor:\n\t[%s :: %s]", interfaceClassname, targetClassname)
|
|
|
|
|
|
2010-07-29 17:29:51 +02:00
|
|
|
val (interfaceClass, targetClass) =
|
|
|
|
|
if (applicationLoader.isDefined) (applicationLoader.get.loadClass(interfaceClassname),
|
2010-07-26 18:47:25 +02:00
|
|
|
applicationLoader.get.loadClass(targetClassname))
|
|
|
|
|
else (Class.forName(interfaceClassname), Class.forName(targetClassname))
|
2010-07-29 17:29:51 +02:00
|
|
|
|
2010-07-26 18:47:25 +02:00
|
|
|
val newInstance = TypedActor.newInstance(
|
|
|
|
|
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
|
2010-10-04 14:13:49 +02:00
|
|
|
server.typedActors.put(uuidFrom(uuid.getHigh,uuid.getLow).toString, newInstance) // register by uuid
|
2010-07-26 18:47:25 +02:00
|
|
|
newInstance
|
|
|
|
|
} catch {
|
2010-08-18 09:52:11 +02:00
|
|
|
case e =>
|
|
|
|
|
log.error(e, "Could not create remote typed actor instance")
|
2010-09-12 11:24:27 +02:00
|
|
|
server.notifyListeners(RemoteServerError(e, server))
|
2010-08-18 09:52:11 +02:00
|
|
|
throw e
|
2010-07-26 18:47:25 +02:00
|
|
|
}
|
2010-07-31 00:41:43 +02:00
|
|
|
} else typedActorOrNull
|
2010-07-26 18:47:25 +02:00
|
|
|
}
|
2010-07-29 17:29:51 +02:00
|
|
|
|
2010-07-26 18:47:25 +02:00
|
|
|
private def createErrorReplyMessage(e: Throwable, request: RemoteRequestProtocol, isActor: Boolean): RemoteReplyProtocol = {
|
|
|
|
|
val actorInfo = request.getActorInfo
|
|
|
|
|
log.error(e, "Could not invoke remote typed actor [%s :: %s]", actorInfo.getTypedActorInfo.getMethod, actorInfo.getTarget)
|
|
|
|
|
val replyBuilder = RemoteReplyProtocol.newBuilder
|
2010-09-17 16:04:25 +02:00
|
|
|
.setUuid(request.getUuid)
|
2010-07-26 18:47:25 +02:00
|
|
|
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
|
|
|
|
.setIsSuccessful(false)
|
|
|
|
|
.setIsActor(isActor)
|
|
|
|
|
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
|
|
|
|
replyBuilder.build
|
|
|
|
|
}
|
2009-06-24 15:12:47 +02:00
|
|
|
}
|