From 42185955fa8c1aa3a74ca0cfd4eb2f2afa57a8be Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 25 May 2010 21:53:49 +0200 Subject: [PATCH] Initial attempt at fixing akka rest --- .../actor/BootableActorLoaderService.scala | 10 +- akka-core/src/main/scala/remote/Cluster.scala | 97 ++++++++----------- .../main/scala/ActorComponentProvider.scala | 29 ------ .../scala/ActorComponentProviderFactory.scala | 20 ---- akka-http/src/main/scala/AkkaServlet.scala | 19 ++-- config/akka-reference.conf | 1 + 6 files changed, 60 insertions(+), 116 deletions(-) delete mode 100644 akka-http/src/main/scala/ActorComponentProvider.scala delete mode 100644 akka-http/src/main/scala/ActorComponentProviderFactory.scala diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index de0d174fc7..4d98820c8c 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -11,6 +11,13 @@ import java.util.jar.JarFile import se.scalablesolutions.akka.util.{Bootable, Logging} import se.scalablesolutions.akka.config.Config._ +object ActorModules { + import java.util.concurrent.atomic.AtomicReference + private val _loader = new AtomicReference[Option[ClassLoader]](None) + def loader_? = _loader.get + private[actor] def loader_?(cl : Option[ClassLoader]) = _loader.set(cl) +} + /** * Handles all modules in the deploy directory (load and unload) */ @@ -46,15 +53,14 @@ trait BootableActorLoaderService extends Bootable with Logging { log.debug("Loading dependencies [%s]", dependencyJars) val allJars = toDeploy ::: dependencyJars - val parentClassLoader = classOf[Seq[_]].getClassLoader URLClassLoader.newInstance( allJars.toArray.asInstanceOf[Array[URL]], ClassLoader.getSystemClassLoader) - //parentClassLoader) } else getClass.getClassLoader) } abstract override def onLoad = { + ActorModules.loader_?(applicationLoader) for (loader <- applicationLoader; clazz <- BOOT_CLASSES) { log.info("Loading boot class [%s]", clazz) loader.loadClass(clazz).newInstance diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 948cb07a8b..66f7b59baa 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -23,25 +23,6 @@ trait Cluster { */ def name: String - /** - * Adds the specified hostname + port as a local node - * This information will be propagated to other nodes in the cluster - * and will be available at the other nodes through lookup and foreach - */ - def registerLocalNode(hostname: String, port: Int): Unit - - /** - * Removes the specified hostname + port from the local node - * This information will be propagated to other nodes in the cluster - * and will no longer be available at the other nodes through lookup and foreach - */ - def deregisterLocalNode(hostname: String, port: Int): Unit - - /** - * Sends the message to all Actors of the specified type on all other nodes in the cluster - */ - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit - /** * Traverses all known remote addresses avaiable at all other nodes in the cluster * and applies the given PartialFunction on the first address that it's defined at @@ -65,8 +46,6 @@ trait ClusterActor extends Actor with Cluster { val name = config.getString("akka.remote.cluster.name", "default") @volatile protected var serializer : Serializer = _ - - private[remote] def setSerializer(s : Serializer) : Unit = serializer = s } /** @@ -87,6 +66,7 @@ private[akka] object ClusterActor { private[akka] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage private[akka] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage private[akka] case class Node(endpoints: List[RemoteAddress]) + private[akka] case class InitClusterActor(serializer : Serializer) } /** @@ -168,6 +148,10 @@ abstract class BasicClusterActor extends ClusterActor with Logging { local = Node(local.endpoints.filterNot(_ == s)) broadcast(Papers(local.endpoints)) } + + case InitClusterActor(s) => { + serializer = s + } } /** @@ -206,24 +190,6 @@ abstract class BasicClusterActor extends ClusterActor with Logging { * Applies the given function to all remote addresses known */ def foreach(f: (RemoteAddress) => Unit): Unit = remotes.valuesIterator.toList.flatMap(_.endpoints).foreach(f) - - /** - * Registers a local endpoint - */ - def registerLocalNode(hostname: String, port: Int): Unit = - self ! RegisterLocalNode(RemoteAddress(hostname, port)) - - /** - * Deregisters a local endpoint - */ - def deregisterLocalNode(hostname: String, port: Int): Unit = - self ! DeregisterLocalNode(RemoteAddress(hostname, port)) - - /** - * Broadcasts the specified message to all Actors of type Class on all known Nodes - */ - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = - self ! RelayedMessage(to.getName, msg) } /** @@ -232,28 +198,22 @@ abstract class BasicClusterActor extends ClusterActor with Logging { * Loads a specified ClusterActor and delegates to that instance. */ object Cluster extends Cluster with Logging { + //Import messages + import ClusterActor._ + lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName lazy val DEFAULT_CLUSTER_ACTOR_CLASS_NAME = classOf[JGroupsClusterActor].getName - @volatile private[remote] var clusterActor: Option[ClusterActor] = None @volatile private[remote] var clusterActorRef: Option[ActorRef] = None + @volatile private[akka] var classLoader : Option[ClassLoader] = Some(getClass.getClassLoader) - private[remote] def createClusterActor(loader: ClassLoader): Option[ActorRef] = { + private[remote] def createClusterActor(): Option[ActorRef] = { val name = config.getString("akka.remote.cluster.actor", DEFAULT_CLUSTER_ACTOR_CLASS_NAME) if (name.isEmpty) throw new IllegalArgumentException( "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") - val serializer = Class.forName(config.getString( - "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) - .newInstance.asInstanceOf[Serializer] - serializer.classLoader = Some(loader) - try { - Some(Actor.actorOf { - val a = Class.forName(name).newInstance.asInstanceOf[ClusterActor] - a setSerializer serializer - a - }) + Some(Actor.actorOf(Class.forName(name).newInstance.asInstanceOf[ClusterActor])) } catch { case e => log.error(e, "Couldn't load Cluster provider: [%s]", name) @@ -267,15 +227,27 @@ object Cluster extends Cluster with Logging { RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), Supervise(actor, LifeCycle(Permanent)) :: Nil))) + private[this] def clusterActor = if(clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor]) + def name = clusterActor.map(_.name).getOrElse("No cluster") def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf)) - def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.registerLocalNode(hostname, port)) + /**Adds the specified hostname + port as a local node + * This information will be propagated to other nodes in the cluster + * and will be available at the other nodes through lookup and foreach + */ + def registerLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! RegisterLocalNode(RemoteAddress(hostname, port))) - def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port)) + /**Removes the specified hostname + port from the local node + * This information will be propagated to other nodes in the cluster + * and will no longer be available at the other nodes through lookup and foreach + */ + def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! DeregisterLocalNode(RemoteAddress(hostname, port))) - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg)) + /**Sends the message to all Actors of the specified type on all other nodes in the cluster + */ + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActorRef.foreach(_ ! RelayedMessage(to.getName, msg)) def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f)) @@ -283,14 +255,21 @@ object Cluster extends Cluster with Logging { def start(serializerClassLoader: Option[ClassLoader]): Unit = synchronized { log.info("Starting up Cluster Service...") - if (clusterActor.isEmpty) { + if (clusterActorRef.isEmpty) { for { - actorRef <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader) + actorRef <- createClusterActor() sup <- createSupervisor(actorRef) } { - clusterActorRef = Some(actorRef.start) - clusterActor = Some(actorRef.actor.asInstanceOf[ClusterActor]) + val serializer = Class.forName(config.getString( + "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) + .newInstance.asInstanceOf[Serializer] + + classLoader = serializerClassLoader orElse classLoader + serializer.classLoader = classLoader + actorRef.start sup.start + actorRef ! InitClusterActor(serializer) + clusterActorRef = Some(actorRef) } } } @@ -301,6 +280,6 @@ object Cluster extends Cluster with Logging { c <- clusterActorRef s <- c.supervisor } s.stop - clusterActor = None + classLoader = Some(getClass.getClassLoader) } } diff --git a/akka-http/src/main/scala/ActorComponentProvider.scala b/akka-http/src/main/scala/ActorComponentProvider.scala deleted file mode 100644 index 52512b6929..0000000000 --- a/akka-http/src/main/scala/ActorComponentProvider.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.rest - -import com.sun.jersey.core.spi.component.ComponentScope -import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider - -import se.scalablesolutions.akka.config.Configurator -import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.actor.Actor - -class ActorComponentProvider(val clazz: Class[_], val configurators: List[Configurator]) - extends IoCFullyManagedComponentProvider with Logging { - - override def getScope = ComponentScope.Singleton - - override def getInstance: AnyRef = { - val instances = for { - conf <- configurators - if conf.isDefined(clazz) - instance <- conf.getInstance(clazz) - } yield instance - if (instances.isEmpty) throw new IllegalArgumentException( - "No Actor or Active Object for class [" + clazz + "] could be found.\nMake sure you have defined and configured the class as an Active Object or Actor in a supervisor hierarchy.") - else instances.head.asInstanceOf[AnyRef] - } -} diff --git a/akka-http/src/main/scala/ActorComponentProviderFactory.scala b/akka-http/src/main/scala/ActorComponentProviderFactory.scala deleted file mode 100644 index e1ea94347f..0000000000 --- a/akka-http/src/main/scala/ActorComponentProviderFactory.scala +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.rest - -import com.sun.jersey.core.spi.component.ioc.{IoCComponentProvider,IoCComponentProviderFactory} -import com.sun.jersey.core.spi.component.{ComponentContext} - -import se.scalablesolutions.akka.config.Configurator -import se.scalablesolutions.akka.util.Logging - -class ActorComponentProviderFactory(val configurators: List[Configurator]) -extends IoCComponentProviderFactory with Logging { - override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz) - - override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = { - configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null) - } -} diff --git a/akka-http/src/main/scala/AkkaServlet.scala b/akka-http/src/main/scala/AkkaServlet.scala index aecdaa9671..6a53e5629f 100644 --- a/akka-http/src/main/scala/AkkaServlet.scala +++ b/akka-http/src/main/scala/AkkaServlet.scala @@ -4,9 +4,8 @@ package se.scalablesolutions.akka.rest -import se.scalablesolutions.akka.config.ConfiguratorRepository import se.scalablesolutions.akka.config.Config.config - +import se.scalablesolutions.akka.actor.ActorModules import com.sun.jersey.api.core.ResourceConfig import com.sun.jersey.spi.container.servlet.ServletContainer import com.sun.jersey.spi.container.WebApplication @@ -21,13 +20,21 @@ class AkkaServlet extends ServletContainer { import scala.collection.JavaConversions._ override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = { - val configurators = ConfiguratorRepository.getConfigurators - - resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces)) + resourceConfig.getProperties.put( + "com.sun.jersey.config.property.packages", + config.getList("akka.rest.resource_packages").mkString(";") + ) resourceConfig.getProperties.put( "com.sun.jersey.spi.container.ResourceFilters", config.getList("akka.rest.filters").mkString(",")) - webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators)) + val cl = Thread.currentThread.getContextClassLoader + Thread.currentThread.setContextClassLoader(ActorModules.loader_?.getOrElse(cl)) + try { + webApplication.initiate(resourceConfig) + } + finally{ + Thread.currentThread.setContextClassLoader(cl) + } } } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index b61bdd773f..84fee17176 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -50,6 +50,7 @@ hostname = "localhost" port = 9998 filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use + resource_packages = ["sample.rest.scala","ample.rest.java"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) #IF you are using a KerberosAuthenticationActor