diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 6917b38129..a971652590 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -10,45 +10,38 @@ import se.scalablesolutions.akka.config.Config.config /** * This bundle/service is responsible for booting up and shutting down the remote actors facility - * It's used in Kernel + *

+ * It is used in Kernel */ - trait BootableRemoteActorService extends Bootable with Logging { - self : BootableActorLoaderService => + self: BootableActorLoaderService => protected lazy val remoteServerThread = new Thread(new Runnable() { - def run = RemoteNode.start(self.applicationLoader) + def run = { + if (self.applicationLoader.isDefined) RemoteNode.start(self.applicationLoader.get) + else RemoteNode.start + } }, "Akka Remote Service") def startRemoteService = remoteServerThread.start abstract override def onLoad = { super.onLoad //Initialize BootableActorLoaderService before remote service - if(config.getBool("akka.remote.server.service", true)){ - - if(config.getBool("akka.remote.cluster.service", true)) - Cluster.start(self.applicationLoader) - + if (config.getBool("akka.remote.server.service", true)) { + if (config.getBool("akka.remote.cluster.service", true)) Cluster.start(self.applicationLoader) log.info("Initializing Remote Actors Service...") startRemoteService - log.info("Remote Actors Service initialized!") + log.info("Remote Actors Service initialized") } } abstract override def onUnload = { log.info("Shutting down Remote Actors Service") - RemoteNode.shutdown - - if (remoteServerThread.isAlive) - remoteServerThread.join(1000) - + if (remoteServerThread.isAlive) remoteServerThread.join(1000) log.info("Shutting down Cluster") Cluster.shutdown - log.info("Remote Actors Service has been shut down") - super.onUnload } - } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 99573e7d24..3af87fb7b5 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -67,15 +67,22 @@ object RemoteClient extends Logging { def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = RemoteActorRef(actorRef, className, hostname, port, timeout) - def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) + def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port), None) - def clientFor(address: InetSocketAddress): RemoteClient = synchronized { + def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient = clientFor(new InetSocketAddress(hostname, port), Some(loader)) + + def clientFor(address: InetSocketAddress): RemoteClient = clientFor(address, None) + + def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient = clientFor(address, Some(loader)) + + private def clientFor(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { val hostname = address.getHostName val port = address.getPort val hash = hostname + ':' + port + loader.foreach(RemoteProtocolBuilder.setClassLoader(_)) if (remoteClients.contains(hash)) remoteClients(hash) else { - val client = new RemoteClient(hostname, port) + val client = new RemoteClient(hostname, port, loader) client.connect remoteClients += hash -> client client @@ -126,7 +133,7 @@ object RemoteClient extends Logging { /** * @author Jonas Bonér */ -class RemoteClient(val hostname: String, val port: Int) extends Logging { +class RemoteClient(val hostname: String, val port: Int, loader: Option[ClassLoader]) extends Logging { val name = "RemoteClient@" + hostname + "::" + port @volatile private[remote] var isRunning = false @@ -287,7 +294,7 @@ class RemoteClientHandler(val name: String, } } catch { case e: Exception => - client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e)) + client.listeners.toArray.foreach(l => l.asInstanceOf[ActorRef] ! RemoteClientError(e)) log.error("Unexpected exception in remote client handler: %s", e) throw e } diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index 3cb3d28178..ef2657901c 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -18,9 +18,10 @@ object RemoteProtocolBuilder { private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf def setClassLoader(cl: ClassLoader) = { - SERIALIZER_JAVA.classLoader = Some(cl) - SERIALIZER_JAVA_JSON.classLoader = Some(cl) + SERIALIZER_JAVA.classLoader = Some(cl) + SERIALIZER_JAVA_JSON.classLoader = Some(cl) SERIALIZER_SCALA_JSON.classLoader = Some(cl) + SERIALIZER_SBINARY.classLoader = Some(cl) } def getMessage(request: RemoteRequestProtocol): Any = { @@ -28,7 +29,10 @@ object RemoteProtocolBuilder { case SerializationProtocol.JAVA => unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) case SerializationProtocol.SBINARY => - val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] + val classToLoad = new String(request.getMessageManifest.toByteArray) + val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad) + else Class.forName(classToLoad) + val renderer = clazz.newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(request.getMessage.toByteArray) case SerializationProtocol.SCALA_JSON => val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String] @@ -47,7 +51,10 @@ object RemoteProtocolBuilder { case SerializationProtocol.JAVA => unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) case SerializationProtocol.SBINARY => - val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] + val classToLoad = new String(reply.getMessageManifest.toByteArray) + val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad) + else Class.forName(classToLoad) + val renderer = clazz.newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(reply.getMessage.toByteArray) case SerializationProtocol.SCALA_JSON => val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String] diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 06fdeae34c..a0b4810be3 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -172,19 +172,22 @@ class RemoteServer extends Logging { def start: RemoteServer = start(hostname, port, None) - def start(loader: Option[ClassLoader]): RemoteServer = - start(hostname, port, loader) + def start(loader: ClassLoader): RemoteServer = + start(hostname, port, Some(loader)) def start(address: InetSocketAddress): RemoteServer = start(address.getHostName, address.getPort, None) - def start(address: InetSocketAddress, loader: Option[ClassLoader]): RemoteServer = - start(address.getHostName, address.getPort, loader) + def start(address: InetSocketAddress, loader: ClassLoader): RemoteServer = + start(address.getHostName, address.getPort, Some(loader)) def start(_hostname: String, _port: Int): RemoteServer = start(_hostname, _port, None) - def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized { + private def start(_hostname: String, _port: Int, loader: ClassLoader): RemoteServer = + start(_hostname, _port, Some(loader)) + + private def start(_hostname: String, _port: Int, loader: Option[ClassLoader]): RemoteServer = synchronized { try { if (!_isRunning) { hostname = _hostname diff --git a/akka-core/src/main/scala/serialization/Serializer.scala b/akka-core/src/main/scala/serialization/Serializer.scala index 0f64835556..7298e63fc6 100644 --- a/akka-core/src/main/scala/serialization/Serializer.scala +++ b/akka-core/src/main/scala/serialization/Serializer.scala @@ -169,6 +169,8 @@ object Serializer { import sbinary.Operations._ import sbinary.DefaultProtocol._ + var classLoader: Option[ClassLoader] = None + def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None) def out[T](t : T)(implicit bin : Writes[T]): Array[Byte] = toByteArray[T](t)