Added option to specify class loader to load serialized classes in the RemoteClient + cleaned up RemoteClient and RemoteServer API in this regard

This commit is contained in:
Jonas Bonér 2010-05-25 15:14:53 +02:00
parent 5736f92126
commit 176bf48f7b
5 changed files with 44 additions and 32 deletions

View file

@ -10,45 +10,38 @@ import se.scalablesolutions.akka.config.Config.config
/** /**
* This bundle/service is responsible for booting up and shutting down the remote actors facility * This bundle/service is responsible for booting up and shutting down the remote actors facility
* It's used in Kernel * <p/>
* It is used in Kernel
*/ */
trait BootableRemoteActorService extends Bootable with Logging { trait BootableRemoteActorService extends Bootable with Logging {
self : BootableActorLoaderService => self: BootableActorLoaderService =>
protected lazy val remoteServerThread = new Thread(new Runnable() { protected lazy val remoteServerThread = new Thread(new Runnable() {
def run = RemoteNode.start(self.applicationLoader) def run = {
if (self.applicationLoader.isDefined) RemoteNode.start(self.applicationLoader.get)
else RemoteNode.start
}
}, "Akka Remote Service") }, "Akka Remote Service")
def startRemoteService = remoteServerThread.start def startRemoteService = remoteServerThread.start
abstract override def onLoad = { abstract override def onLoad = {
super.onLoad //Initialize BootableActorLoaderService before remote service super.onLoad //Initialize BootableActorLoaderService before remote service
if(config.getBool("akka.remote.server.service", true)){ if (config.getBool("akka.remote.server.service", true)) {
if (config.getBool("akka.remote.cluster.service", true)) Cluster.start(self.applicationLoader)
if(config.getBool("akka.remote.cluster.service", true))
Cluster.start(self.applicationLoader)
log.info("Initializing Remote Actors Service...") log.info("Initializing Remote Actors Service...")
startRemoteService startRemoteService
log.info("Remote Actors Service initialized!") log.info("Remote Actors Service initialized")
} }
} }
abstract override def onUnload = { abstract override def onUnload = {
log.info("Shutting down Remote Actors Service") log.info("Shutting down Remote Actors Service")
RemoteNode.shutdown RemoteNode.shutdown
if (remoteServerThread.isAlive) remoteServerThread.join(1000)
if (remoteServerThread.isAlive)
remoteServerThread.join(1000)
log.info("Shutting down Cluster") log.info("Shutting down Cluster")
Cluster.shutdown Cluster.shutdown
log.info("Remote Actors Service has been shut down") log.info("Remote Actors Service has been shut down")
super.onUnload super.onUnload
} }
} }

View file

@ -67,15 +67,22 @@ object RemoteClient extends Logging {
def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
RemoteActorRef(actorRef, className, hostname, port, timeout) RemoteActorRef(actorRef, className, hostname, port, timeout)
def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port), None)
def clientFor(address: InetSocketAddress): RemoteClient = synchronized { def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient = clientFor(new InetSocketAddress(hostname, port), Some(loader))
def clientFor(address: InetSocketAddress): RemoteClient = clientFor(address, None)
def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient = clientFor(address, Some(loader))
private def clientFor(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized {
val hostname = address.getHostName val hostname = address.getHostName
val port = address.getPort val port = address.getPort
val hash = hostname + ':' + port val hash = hostname + ':' + port
loader.foreach(RemoteProtocolBuilder.setClassLoader(_))
if (remoteClients.contains(hash)) remoteClients(hash) if (remoteClients.contains(hash)) remoteClients(hash)
else { else {
val client = new RemoteClient(hostname, port) val client = new RemoteClient(hostname, port, loader)
client.connect client.connect
remoteClients += hash -> client remoteClients += hash -> client
client client
@ -126,7 +133,7 @@ object RemoteClient extends Logging {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class RemoteClient(val hostname: String, val port: Int) extends Logging { class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port val name = "RemoteClient@" + hostname + "::" + port
@volatile private[remote] var isRunning = false @volatile private[remote] var isRunning = false
@ -287,7 +294,7 @@ class RemoteClientHandler(val name: String,
} }
} catch { } catch {
case e: Exception => case e: Exception =>
client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e)) client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e))
log.error("Unexpected exception in remote client handler: %s", e) log.error("Unexpected exception in remote client handler: %s", e)
throw e throw e
} }

View file

@ -18,9 +18,10 @@ object RemoteProtocolBuilder {
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(cl: ClassLoader) = { def setClassLoader(cl: ClassLoader) = {
SERIALIZER_JAVA.classLoader = Some(cl) SERIALIZER_JAVA.classLoader = Some(cl)
SERIALIZER_JAVA_JSON.classLoader = Some(cl) SERIALIZER_JAVA_JSON.classLoader = Some(cl)
SERIALIZER_SCALA_JSON.classLoader = Some(cl) SERIALIZER_SCALA_JSON.classLoader = Some(cl)
SERIALIZER_SBINARY.classLoader = Some(cl)
} }
def getMessage(request: RemoteRequestProtocol): Any = { def getMessage(request: RemoteRequestProtocol): Any = {
@ -28,7 +29,10 @@ object RemoteProtocolBuilder {
case SerializationProtocol.JAVA => case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
case SerializationProtocol.SBINARY => case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] val classToLoad = new String(request.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray) renderer.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON => case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String] val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
@ -47,7 +51,10 @@ object RemoteProtocolBuilder {
case SerializationProtocol.JAVA => case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
case SerializationProtocol.SBINARY => case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] val classToLoad = new String(reply.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(reply.getMessage.toByteArray) renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationProtocol.SCALA_JSON => case SerializationProtocol.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String] val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]

View file

@ -172,19 +172,22 @@ class RemoteServer extends Logging {
def start: RemoteServer = def start: RemoteServer =
start(hostname, port, None) start(hostname, port, None)
def start(loader: Option[ClassLoader]): RemoteServer = def start(loader: ClassLoader): RemoteServer =
start(hostname, port, loader) start(hostname, port, Some(loader))
def start(address: InetSocketAddress): RemoteServer = def start(address: InetSocketAddress): RemoteServer =
start(address.getHostName, address.getPort, None) start(address.getHostName, address.getPort, None)
def start(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteServer = def start(address: InetSocketAddress, loader: ClassLoader): RemoteServer =
start(address.getHostName, address.getPort, loader) start(address.getHostName, address.getPort, Some(loader))
def start(_hostname: String, _port: Int): RemoteServer = def start(_hostname: String, _port: Int): RemoteServer =
start(_hostname, _port, None) start(_hostname, _port, None)
def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized { private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer =
start(_hostname, _port, Some(loader))
private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized {
try { try {
if (!_isRunning) { if (!_isRunning) {
hostname = _hostname hostname = _hostname

View file

@ -169,6 +169,8 @@ object Serializer {
import sbinary.Operations._ import sbinary.Operations._
import sbinary.DefaultProtocol._ import sbinary.DefaultProtocol._
var classLoader: Option[ClassLoader] = None
def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None) def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None)
def out[T](t : T)(implicit bin : Writes[T]): Array[Byte] = toByteArray[T](t) def out[T](t : T)(implicit bin : Writes[T]): Array[Byte] = toByteArray[T](t)