Initial attempt at fixing akka rest

This commit is contained in:
Viktor Klang 2010-05-25 21:53:49 +02:00
parent 829ab8dcc3
commit 42185955fa
6 changed files with 60 additions and 116 deletions

View file

@ -11,6 +11,13 @@ import java.util.jar.JarFile
import se.scalablesolutions.akka.util.{Bootable, Logging}
import se.scalablesolutions.akka.config.Config._
object ActorModules {
import java.util.concurrent.atomic.AtomicReference
private val _loader = new AtomicReference[Option[ClassLoader]](None)
def loader_? = _loader.get
private[actor] def loader_?(cl : Option[ClassLoader]) = _loader.set(cl)
}
/**
* Handles all modules in the deploy directory (load and unload)
*/
@ -46,15 +53,14 @@ trait BootableActorLoaderService extends Bootable with Logging {
log.debug("Loading dependencies [%s]", dependencyJars)
val allJars = toDeploy ::: dependencyJars
val parentClassLoader = classOf[Seq[_]].getClassLoader
URLClassLoader.newInstance(
allJars.toArray.asInstanceOf[Array[URL]],
ClassLoader.getSystemClassLoader)
//parentClassLoader)
} else getClass.getClassLoader)
}
abstract override def onLoad = {
ActorModules.loader_?(applicationLoader)
for (loader <- applicationLoader; clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance

View file

@ -23,25 +23,6 @@ trait Cluster {
*/
def name: String
/**
* Adds the specified hostname + port as a local node
* This information will be propagated to other nodes in the cluster
* and will be available at the other nodes through lookup and foreach
*/
def registerLocalNode(hostname: String, port: Int): Unit
/**
* Removes the specified hostname + port from the local node
* This information will be propagated to other nodes in the cluster
* and will no longer be available at the other nodes through lookup and foreach
*/
def deregisterLocalNode(hostname: String, port: Int): Unit
/**
* Sends the message to all Actors of the specified type on all other nodes in the cluster
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit
/**
* Traverses all known remote addresses avaiable at all other nodes in the cluster
* and applies the given PartialFunction on the first address that it's defined at
@ -65,8 +46,6 @@ trait ClusterActor extends Actor with Cluster {
val name = config.getString("akka.remote.cluster.name", "default")
@volatile protected var serializer : Serializer = _
private[remote] def setSerializer(s : Serializer) : Unit = serializer = s
}
/**
@ -87,6 +66,7 @@ private[akka] object ClusterActor {
private[akka] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
private[akka] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
private[akka] case class Node(endpoints: List[RemoteAddress])
private[akka] case class InitClusterActor(serializer : Serializer)
}
/**
@ -168,6 +148,10 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
local = Node(local.endpoints.filterNot(_ == s))
broadcast(Papers(local.endpoints))
}
case InitClusterActor(s) => {
serializer = s
}
}
/**
@ -206,24 +190,6 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
* Applies the given function to all remote addresses known
*/
def foreach(f: (RemoteAddress) => Unit): Unit = remotes.valuesIterator.toList.flatMap(_.endpoints).foreach(f)
/**
* Registers a local endpoint
*/
def registerLocalNode(hostname: String, port: Int): Unit =
self ! RegisterLocalNode(RemoteAddress(hostname, port))
/**
* Deregisters a local endpoint
*/
def deregisterLocalNode(hostname: String, port: Int): Unit =
self ! DeregisterLocalNode(RemoteAddress(hostname, port))
/**
* Broadcasts the specified message to all Actors of type Class on all known Nodes
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
self ! RelayedMessage(to.getName, msg)
}
/**
@ -232,28 +198,22 @@ abstract class BasicClusterActor extends ClusterActor with Logging {
* Loads a specified ClusterActor and delegates to that instance.
*/
object Cluster extends Cluster with Logging {
//Import messages
import ClusterActor._
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
lazy val DEFAULT_CLUSTER_ACTOR_CLASS_NAME = classOf[JGroupsClusterActor].getName
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
@volatile private[remote] var clusterActorRef: Option[ActorRef] = None
@volatile private[akka] var classLoader : Option[ClassLoader] = Some(getClass.getClassLoader)
private[remote] def createClusterActor(loader: ClassLoader): Option[ActorRef] = {
private[remote] def createClusterActor(): Option[ActorRef] = {
val name = config.getString("akka.remote.cluster.actor", DEFAULT_CLUSTER_ACTOR_CLASS_NAME)
if (name.isEmpty) throw new IllegalArgumentException(
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
val serializer = Class.forName(config.getString(
"akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
serializer.classLoader = Some(loader)
try {
Some(Actor.actorOf {
val a = Class.forName(name).newInstance.asInstanceOf[ClusterActor]
a setSerializer serializer
a
})
Some(Actor.actorOf(Class.forName(name).newInstance.asInstanceOf[ClusterActor]))
} catch {
case e =>
log.error(e, "Couldn't load Cluster provider: [%s]", name)
@ -267,15 +227,27 @@ object Cluster extends Cluster with Logging {
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
Supervise(actor, LifeCycle(Permanent)) :: Nil)))
private[this] def clusterActor = if(clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor])
def name = clusterActor.map(_.name).getOrElse("No cluster")
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf))
def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.registerLocalNode(hostname, port))
/**Adds the specified hostname + port as a local node
* This information will be propagated to other nodes in the cluster
* and will be available at the other nodes through lookup and foreach
*/
def registerLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! RegisterLocalNode(RemoteAddress(hostname, port)))
def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port))
/**Removes the specified hostname + port from the local node
* This information will be propagated to other nodes in the cluster
* and will no longer be available at the other nodes through lookup and foreach
*/
def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! DeregisterLocalNode(RemoteAddress(hostname, port)))
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg))
/**Sends the message to all Actors of the specified type on all other nodes in the cluster
*/
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActorRef.foreach(_ ! RelayedMessage(to.getName, msg))
def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
@ -283,14 +255,21 @@ object Cluster extends Cluster with Logging {
def start(serializerClassLoader: Option[ClassLoader]): Unit = synchronized {
log.info("Starting up Cluster Service...")
if (clusterActor.isEmpty) {
if (clusterActorRef.isEmpty) {
for {
actorRef <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader)
actorRef <- createClusterActor()
sup <- createSupervisor(actorRef)
} {
clusterActorRef = Some(actorRef.start)
clusterActor = Some(actorRef.actor.asInstanceOf[ClusterActor])
val serializer = Class.forName(config.getString(
"akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
classLoader = serializerClassLoader orElse classLoader
serializer.classLoader = classLoader
actorRef.start
sup.start
actorRef ! InitClusterActor(serializer)
clusterActorRef = Some(actorRef)
}
}
}
@ -301,6 +280,6 @@ object Cluster extends Cluster with Logging {
c <- clusterActorRef
s <- c.supervisor
} s.stop
clusterActor = None
classLoader = Some(getClass.getClassLoader)
}
}

View file

@ -1,29 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.rest
import com.sun.jersey.core.spi.component.ComponentScope
import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider
import se.scalablesolutions.akka.config.Configurator
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.Actor
class ActorComponentProvider(val clazz: Class[_], val configurators: List[Configurator])
extends IoCFullyManagedComponentProvider with Logging {
override def getScope = ComponentScope.Singleton
override def getInstance: AnyRef = {
val instances = for {
conf <- configurators
if conf.isDefined(clazz)
instance <- conf.getInstance(clazz)
} yield instance
if (instances.isEmpty) throw new IllegalArgumentException(
"No Actor or Active Object for class [" + clazz + "] could be found.\nMake sure you have defined and configured the class as an Active Object or Actor in a supervisor hierarchy.")
else instances.head.asInstanceOf[AnyRef]
}
}

View file

@ -1,20 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.rest
import com.sun.jersey.core.spi.component.ioc.{IoCComponentProvider,IoCComponentProviderFactory}
import com.sun.jersey.core.spi.component.{ComponentContext}
import se.scalablesolutions.akka.config.Configurator
import se.scalablesolutions.akka.util.Logging
class ActorComponentProviderFactory(val configurators: List[Configurator])
extends IoCComponentProviderFactory with Logging {
override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz)
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = {
configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
}
}

View file

@ -4,9 +4,8 @@
package se.scalablesolutions.akka.rest
import se.scalablesolutions.akka.config.ConfiguratorRepository
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.actor.ActorModules
import com.sun.jersey.api.core.ResourceConfig
import com.sun.jersey.spi.container.servlet.ServletContainer
import com.sun.jersey.spi.container.WebApplication
@ -21,13 +20,21 @@ class AkkaServlet extends ServletContainer {
import scala.collection.JavaConversions._
override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = {
val configurators = ConfiguratorRepository.getConfigurators
resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces))
resourceConfig.getProperties.put(
"com.sun.jersey.config.property.packages",
config.getList("akka.rest.resource_packages").mkString(";")
)
resourceConfig.getProperties.put(
"com.sun.jersey.spi.container.ResourceFilters",
config.getList("akka.rest.filters").mkString(","))
webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
val cl = Thread.currentThread.getContextClassLoader
Thread.currentThread.setContextClassLoader(ActorModules.loader_?.getOrElse(cl))
try {
webApplication.initiate(resourceConfig)
}
finally{
Thread.currentThread.setContextClassLoader(cl)
}
}
}

View file

@ -50,6 +50,7 @@
hostname = "localhost"
port = 9998
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
resource_packages = ["sample.rest.scala","ample.rest.java"] # List with all resource packages for your Jersey services
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
#IF you are using a KerberosAuthenticationActor