diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index 85ec2dc8ca..8e77eec2be 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -16,10 +16,10 @@ import se.scalablesolutions.akka.Config._ trait BootableActorLoaderService extends Bootable with Logging { val BOOT_CLASSES = config.getList("akka.boot") - var applicationLoader: Option[ClassLoader] = None + lazy val applicationLoader: Option[ClassLoader] = createApplicationClassLoader - protected def runApplicationBootClasses : Option[ClassLoader] = { - val loader = + protected def createApplicationClassLoader : Option[ClassLoader] = { + Some( if (HOME.isDefined) { val CONFIG = HOME.get + "/config" val DEPLOY = HOME.get + "/deploy" @@ -35,16 +35,18 @@ trait BootableActorLoaderService extends Bootable with Logging { getClass.getClassLoader } else throw new IllegalStateException( "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") - for (clazz <- BOOT_CLASSES) { + ) + } + + abstract override def onLoad = { + for (loader <- applicationLoader; + clazz <- BOOT_CLASSES) + { log.info("Loading boot class [%s]", clazz) loader.loadClass(clazz).newInstance } - Some(loader) - } - abstract override def onLoad = { - applicationLoader = runApplicationBootClasses - super.onLoad + super.onLoad } abstract override def onUnload = { diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index b3ca8d4cb7..38202860fe 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -23,12 +23,17 @@ trait BootableRemoteActorService extends Bootable with Logging { def startRemoteService = remoteServerThread.start abstract override def onLoad = { - super.onLoad //Make sure the actors facility is loaded before we load the remote service if(config.getBool("akka.remote.server.service", true)){ + log.info("Starting up Cluster Service") + Cluster.start + super.onLoad //Initialize BootableActorLoaderService before remote service log.info("Initializing Remote Actors Service...") startRemoteService log.info("Remote Actors Service initialized!") } + else + super.onLoad + } abstract override def onUnload = { @@ -36,9 +41,8 @@ trait BootableRemoteActorService extends Bootable with Logging { if (remoteServerThread.isAlive) { log.info("Shutting down Remote Actors Service") RemoteNode.shutdown - log.info("Shutting down Cluster Service") - Cluster.shutdown remoteServerThread.join(1000) } + Cluster.shutdown } } \ No newline at end of file diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index f355cf5ced..1505b6be45 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -199,34 +199,40 @@ abstract class BasicClusterActor extends ClusterActor { * Loads a specified ClusterActor and delegates to that instance. */ object Cluster extends Cluster with Logging { - private[remote] val clusterActor: Option[ClusterActor] = { - val name = config.getString("akka.remote.cluster.actor","not defined") - try { - val a = Class.forName(name).newInstance.asInstanceOf[ClusterActor] - a.start - Some(a) - } - catch { - case e => log.error(e,"Couldn't load Cluster provider: [%s]",name) - None - } - } - - private[remote] val supervisor: Option[Supervisor] = if (clusterActor.isDefined) { - val sup = SupervisorFactory( - SupervisorConfig( - RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), - Supervise(clusterActor.get, LifeCycle(Permanent)) :: Nil) - ).newInstance - sup.start - Some(sup) - } else None + @volatile private[remote] var clusterActor: Option[ClusterActor] = None + @volatile private[remote] var supervisor: Option[Supervisor] = None private[remote] lazy val serializer: Serializer = { val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName) Class.forName(className).newInstance.asInstanceOf[Serializer] } + private[remote] def createClusterActor : Option[ClusterActor] = { + val name = config.getString("akka.remote.cluster.actor") + + try { + name map { fqn => + val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] + a.start + a + } + } + catch { + case e => log.error(e,"Couldn't load Cluster provider: [%s]",name.getOrElse("Not specified")); None + } + } + + private[remote] def createSupervisor(actor : ClusterActor) : Option[Supervisor] = { + val sup = SupervisorFactory( + SupervisorConfig( + RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), + Supervise(actor, LifeCycle(Permanent)) :: Nil) + ).newInstance + sup.start + Some(sup) + } + + def name = clusterActor.map(_.name).getOrElse("No cluster") def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf)) @@ -237,5 +243,19 @@ object Cluster extends Cluster with Logging { def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg)) - def shutdown = supervisor.foreach(_.stop) + def start : Unit = synchronized { + if(supervisor.isEmpty) { + for(actor <- createClusterActor; + sup <- createSupervisor(actor)) { + clusterActor = Some(actor) + supervisor = Some(sup) + } + } + } + + def shutdown : Unit = synchronized { + supervisor.foreach(_.stop) + supervisor = None + clusterActor = None + } } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 580cbc48cd..c52bd75afa 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -112,8 +112,6 @@ object RemoteServer { private[remote] def unregister(hostname: String, port: Int) = remoteServers.remove(Address(hostname, port)) - - private[remote] def canShutDownCluster: Boolean = remoteServers.isEmpty } /** @@ -186,7 +184,6 @@ class RemoteServer extends Logging { openChannels.close.awaitUninterruptibly(1000) bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) - if (RemoteServer.canShutDownCluster) Cluster.shutdown } }