diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 6c3183ef8c..61c920c0af 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -24,8 +24,8 @@ trait BootableRemoteActorService extends Bootable with Logging { abstract override def onLoad = { if(config.getBool("akka.remote.server.service", true)){ - Cluster.start super.onLoad //Initialize BootableActorLoaderService before remote service + Cluster.start(self.applicationLoader) log.info("Initializing Remote Actors Service...") startRemoteService log.info("Remote Actors Service initialized!") diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index fb14b6b357..62aed8af1d 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -37,6 +37,10 @@ trait Cluster { */ trait ClusterActor extends Actor with Cluster { val name = config.getString("akka.remote.cluster.name") getOrElse "default" + + @volatile protected var serializer : Serializer = _ + + private[remote] def setSerializer(s : Serializer) : Unit = serializer = s } /** @@ -110,7 +114,7 @@ abstract class BasicClusterActor extends ClusterActor { case m: Message[ADDR_T] => { val (src, msg) = (m.sender, m.msg) - (Cluster.serializer in (msg, None)) match { + (serializer in (msg, None)) match { case PapersPlease => { log debug ("Asked for papers by %s", src) @@ -156,7 +160,7 @@ abstract class BasicClusterActor extends ClusterActor { * that's been set in the akka-conf */ protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = { - lazy val m = Cluster.serializer out msg + lazy val m = serializer out msg for (r <- recipients) toOneNode(r, m) } @@ -165,7 +169,7 @@ abstract class BasicClusterActor extends ClusterActor { * that's been set in the akka-conf */ protected def broadcast[T <: AnyRef](msg: T): Unit = - if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg) + if (!remotes.isEmpty) toAllNodes(serializer out msg) /** * Applies the given PartialFunction to all known RemoteAddresses @@ -205,23 +209,21 @@ abstract class BasicClusterActor extends ClusterActor { object Cluster extends Cluster with Logging { lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName - @volatile private[remote] var clusterActor: Option[ClusterActor] = None + @volatile private[remote] var clusterActor: Option[ClusterActor] = None - // FIXME Use the supervisor member field - @volatile private[remote] var supervisor: Option[Supervisor] = None - - private[remote] lazy val serializer: Serializer = - Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) - .newInstance.asInstanceOf[Serializer] - - private[remote] def createClusterActor: Option[ClusterActor] = { + private[remote] def createClusterActor(loader : ClassLoader): Option[ClusterActor] = { val name = config.getString("akka.remote.cluster.actor") if (name.isEmpty) throw new IllegalArgumentException( "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") + + val serializer = Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)).newInstance.asInstanceOf[Serializer] + serializer setClassLoader loader try { name map { fqn => - Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] + val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] + a setSerializer serializer + a } } catch { @@ -251,13 +253,14 @@ object Cluster extends Cluster with Logging { def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f)) - def start: Unit = synchronized { + def start: Unit = start(None) + + def start(serializerClassLoader : Option[ClassLoader]): Unit = synchronized { log.info("Starting up Cluster Service...") - if (supervisor.isEmpty) { - for (actor <- createClusterActor; - sup <- createSupervisor(actor)) { + if (clusterActor.isEmpty) { + for{ actor <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader) + sup <- createSupervisor(actor) } { clusterActor = Some(actor) - supervisor = Some(sup) sup.start } } @@ -265,8 +268,10 @@ object Cluster extends Cluster with Logging { def shutdown: Unit = synchronized { log.info("Shutting down Cluster Service...") - supervisor.foreach(_.stop) - supervisor = None + for{ + c <- clusterActor + s <- c._supervisor + } s.stop clusterActor = None } } diff --git a/akka-core/src/main/scala/serialization/Serializer.scala b/akka-core/src/main/scala/serialization/Serializer.scala index 3eb9315126..1cc930a7eb 100644 --- a/akka-core/src/main/scala/serialization/Serializer.scala +++ b/akka-core/src/main/scala/serialization/Serializer.scala @@ -21,6 +21,10 @@ trait Serializer { def deepClone(obj: AnyRef): AnyRef def out(obj: AnyRef): Array[Byte] def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef + + protected var classLoader: Option[ClassLoader] = None + + def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) } // For Java API @@ -52,10 +56,6 @@ object Serializer { */ object Java extends Java class Java extends Serializer { - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = { @@ -107,10 +107,6 @@ object Serializer { class JavaJSON extends Serializer { private val mapper = new ObjectMapper - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def out(obj: AnyRef): Array[Byte] = { @@ -143,10 +139,6 @@ object Serializer { class ScalaJSON extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) - private var classLoader: Option[ClassLoader] = None - - def setClassLoader(cl: ClassLoader) = classLoader = Some(cl) - def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) // FIXME set ClassLoader on SJSONSerializer.SJSON