Christmas cleaning

This commit is contained in:
Viktor Klang 2009-12-22 19:05:16 +01:00
parent 30be53af79
commit 82cbafac5d
14 changed files with 321 additions and 135 deletions

89
akka-comet/pom.xml Normal file
View file

@ -0,0 +1,89 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>akka-comet</artifactId>
<name>Akka Comet Module</name>
<packaging>jar</packaging>
<parent>
<artifactId>akka</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<!-- Core deps -->
<dependencies>
<dependency>
<artifactId>akka-core</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>akka-rest</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<!-- For the Bootable -->
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-comet-webserver</artifactId>
<version>${grizzly.version}</version>
</dependency>
<!--dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-comet-webserver</artifactId>
<version>${grizzly.version}</version>
</dependency-->
<!-- For Atmosphere -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-annotations</artifactId>
<version>${atmosphere.version}</version>
</dependency>
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-jersey</artifactId>
<version>${atmosphere.version}</version>
</dependency>
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime</artifactId>
<version>${atmosphere.version}</version>
</dependency>
<!--dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
</dependency-->
<!--dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-scala</artifactId>
<version>${jersey.version}</version>
</dependency-->
</dependencies>
</project>

View file

@ -2,48 +2,20 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka
package se.scalablesolutions.akka.comet
import se.scalablesolutions.akka.config.ConfiguratorRepository
import se.scalablesolutions.akka.rest.ActorComponentProviderFactory
import se.scalablesolutions.akka.rest.AkkaServlet
import se.scalablesolutions.akka.util.Logging
import com.sun.jersey.api.core.ResourceConfig
import com.sun.jersey.spi.container.servlet.ServletContainer
import com.sun.jersey.spi.container.WebApplication
import java.util.{List => JList}
import javax.servlet.{ServletConfig}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
import org.atmosphere.container.{GrizzlyCometSupport}
import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
import org.atmosphere.jersey.JerseyBroadcaster
/**
* Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container,
* e.g. not using the Akka Kernel.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AkkaServlet extends ServletContainer {
import org.scala_tools.javautils.Imports._
override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = {
Kernel.boot // will boot if not already booted by 'main'
val configurators = ConfiguratorRepository.getConfigurators
resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava)
resourceConfig.getProperties.put(
"com.sun.jersey.spi.container.ResourceFilters",
Config.config.getList("akka.rest.filters").mkString(","))
webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
}
}
/**
* Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a
@ -79,19 +51,19 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging
atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster))
}
override def createCometSupportResolver() : CometSupportResolver = {
override def createCometSupportResolver() : CometSupportResolver = {
import org.scala_tools.javautils.Imports._
new DefaultCometSupportResolver(config) {
type CS = CometSupport[_ <: AtmosphereResource[_,_]]
override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = {
available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match {
case Nil => new GrizzlyCometSupport(config)
case Nil => new GrizzlyCometSupport(config)
case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]])
case _ => super.resolveMultipleNativeSupportConflict(available)
case _ => super.resolveMultipleNativeSupportConflict(available)
}
}
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = {
val predef = config.getInitParameter("cometSupport")
if(testClassExists(predef))

View file

@ -0,0 +1,68 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.comet
import com.sun.grizzly.http.SelectorThread
import com.sun.grizzly.http.servlet.ServletAdapter
import com.sun.grizzly.standalone.StaticStreamAlgorithm
import javax.ws.rs.core.UriBuilder
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.util.{Bootable,Logging}
trait BootableCometActorService extends Bootable with Logging {
self : BootableActorLoaderService =>
import Config._
val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
val REST_URL = "http://" + REST_HOSTNAME
val REST_PORT = config.getInt("akka.rest.port", 9998)
protected var jerseySelectorThread: Option[SelectorThread] = None
abstract override def onLoad = {
super.onLoad
if(config.getBool("akka.rest.service", true)){
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
val scheme = uri.getScheme
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
"The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
val adapter = new ServletAdapter
adapter.setHandleStaticResources(true)
adapter.setServletInstance(new AkkaCometServlet)
adapter.setContextPath(uri.getPath)
//Using autodetection for now
adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)
val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
jerseySelectorThread = Some(new SelectorThread).map { t =>
t.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
t.setPort(REST_PORT)
t.setAdapter(adapter)
t.setEnableAsyncExecution(true)
t.setAsyncHandler(ah)
t.listen
t }
log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
}
}
abstract override def onUnload = {
super.onUnload
if (jerseySelectorThread.isDefined) {
log.info("Shutting down REST service (Jersey)")
jerseySelectorThread.get.stopEndpoint
}
}
}

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.actor
import java.io.File
import java.net.URLClassLoader
import se.scalablesolutions.akka.util.{Bootable,Logging}
trait BootableActorLoaderService extends Bootable with Logging {
import Config._
val BOOT_CLASSES = config.getList("akka.boot")
var applicationLoader: Option[ClassLoader] = None
protected def runApplicationBootClasses : Option[ClassLoader] = {
val loader =
if (HOME.isDefined) {
val CONFIG = HOME.get + "/config"
val DEPLOY = HOME.get + "/deploy"
val DEPLOY_DIR = new File(DEPLOY)
if (!DEPLOY_DIR.exists) {
log.error("Could not find a deploy directory at [%s]", DEPLOY)
System.exit(-1)
}
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
getClass.getClassLoader
} else throw new IllegalStateException(
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
for (clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
}
Some(loader)
}
abstract override def onLoad = {
applicationLoader = runApplicationBootClasses
super.onLoad
}
abstract override def onUnload = {
ActorRegistry.shutdownAll
}
}

View file

@ -0,0 +1,35 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.actor.BootableActorLoaderService
import se.scalablesolutions.akka.util.{Bootable,Logging}
trait BootableRemoteActorService extends Bootable with Logging {
self : BootableActorLoaderService =>
import Config._
protected lazy val remoteServerThread = new Thread(new Runnable() {
def run = RemoteNode.start(self.applicationLoader)
}, "Akka Remote Service")
def startRemoteService = remoteServerThread.start
abstract override def onLoad = {
super.onLoad
if(config.getBool("akka.remote.server.service", true)) startRemoteService
}
abstract override def onUnload = {
super.onUnload
if (remoteServerThread.isAlive) {
log.info("Shutting down remote service")
RemoteNode.shutdown
remoteServerThread.join(1000)
}
}
}

View file

@ -60,7 +60,7 @@ public class RestTest extends TestCase {
*/
private static SelectorThread startJersey() {
try {
Servlet servlet = new se.scalablesolutions.akka.AkkaServlet();
Servlet servlet = new se.scalablesolutions.akka.rest.AkkaServlet();
ServletAdapter adapter = new ServletAdapter();
adapter.setServletInstance(servlet);
adapter.setContextPath(URI.getPath());

View file

@ -47,6 +47,11 @@
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>akka-comet</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<!-- For Atmosphere -->
<dependency>

View file

@ -4,17 +4,10 @@
package se.scalablesolutions.akka
import com.sun.grizzly.http.SelectorThread
import com.sun.grizzly.http.servlet.ServletAdapter
import com.sun.grizzly.standalone.StaticStreamAlgorithm
import javax.ws.rs.core.UriBuilder
import java.io.File
import java.net.URLClassLoader
import se.scalablesolutions.akka.remote.RemoteNode
import se.scalablesolutions.akka.comet.BootableCometActorService
import se.scalablesolutions.akka.remote.{RemoteNode,BootableRemoteActorService}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.ActorRegistry
import se.scalablesolutions.akka.actor.{ActorRegistry,BootableActorLoaderService}
/**
* The Akka Kernel.
@ -24,22 +17,12 @@ import se.scalablesolutions.akka.actor.ActorRegistry
object Kernel extends Logging {
import Config._
val BOOT_CLASSES = config.getList("akka.boot")
val RUN_REMOTE_SERVICE = config.getBool("akka.remote.server.service", true)
val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
val REST_URL = "http://" + REST_HOSTNAME
val REST_PORT = config.getInt("akka.rest.port", 9998)
// FIXME add API to shut server down gracefully
@volatile private var hasBooted = false
private var jerseySelectorThread: Option[SelectorThread] = None
private val startTime = System.currentTimeMillis
private var applicationLoader: Option[ClassLoader] = None
private lazy val remoteServerThread = new Thread(new Runnable() {
def run = RemoteNode.start(applicationLoader)
}, "Akka Remote Service")
object Bundles extends BootableActorLoaderService with BootableRemoteActorService with BootableCometActorService
def main(args: Array[String]) = boot
@ -56,11 +39,7 @@ object Kernel extends Logging {
if (!hasBooted) {
if (withBanner) printBanner
log.info("Starting Akka...")
runApplicationBootClasses
if (RUN_REMOTE_SERVICE) startRemoteService
if (RUN_REST_SERVICE) startREST
Bundles.onLoad
Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
log.info("Akka started successfully")
hasBooted = true
@ -71,74 +50,12 @@ object Kernel extends Logging {
def shutdown = synchronized {
if (hasBooted) {
log.info("Shutting down Akka...")
ActorRegistry.shutdownAll
if (jerseySelectorThread.isDefined) {
log.info("Shutting down REST service (Jersey)")
jerseySelectorThread.get.stopEndpoint
}
if (remoteServerThread.isAlive) {
log.info("Shutting down remote service")
RemoteNode.shutdown
remoteServerThread.join(1000)
}
Bundles.onUnload
log.info("Akka succesfully shut down")
}
}
def startRemoteService = remoteServerThread.start
def startREST = {
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
val scheme = uri.getScheme
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
"The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
val adapter = new ServletAdapter
adapter.setHandleStaticResources(true)
adapter.setServletInstance(new AkkaCometServlet)
adapter.setContextPath(uri.getPath)
//Using autodetection for now
adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)
val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
jerseySelectorThread = Some(new SelectorThread)
jerseySelectorThread.get.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
jerseySelectorThread.get.setPort(REST_PORT)
jerseySelectorThread.get.setAdapter(adapter)
jerseySelectorThread.get.setEnableAsyncExecution(true)
jerseySelectorThread.get.setAsyncHandler(ah)
jerseySelectorThread.get.listen
log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
}
private def runApplicationBootClasses = {
val loader =
if (HOME.isDefined) {
val CONFIG = HOME.get + "/config"
val DEPLOY = HOME.get + "/deploy"
val DEPLOY_DIR = new File(DEPLOY)
if (!DEPLOY_DIR.exists) {
log.error("Could not find a deploy directory at [%s]", DEPLOY)
System.exit(-1)
}
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
getClass.getClassLoader
} else throw new IllegalStateException(
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
for (clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
}
applicationLoader = Some(loader)
}
def startRemoteService = Bundles.startRemoteService
private def printBanner = {
log.info(

View file

@ -23,11 +23,16 @@
<version>${project.version}</version>
</dependency>
<!-- For Jersey & Atmosphere -->
<!-- For Jersey -->
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-comet-webserver</artifactId>
<version>${grizzly.version}</version>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>

View file

@ -0,0 +1,34 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.rest
import se.scalablesolutions.akka.config.ConfiguratorRepository
import com.sun.jersey.api.core.ResourceConfig
import com.sun.jersey.spi.container.servlet.ServletContainer
import com.sun.jersey.spi.container.WebApplication
/**
* Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container,
* e.g. not using the Akka Kernel.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AkkaServlet extends ServletContainer {
import org.scala_tools.javautils.Imports._
override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = {
//Kernel.boot // will boot if not already booted by 'main'
val configurators = ConfiguratorRepository.getConfigurators
resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava)
resourceConfig.getProperties.put(
"com.sun.jersey.spi.container.ResourceFilters",
Config.config.getList("akka.rest.filters").mkString(","))
webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
}
}

View file

@ -13,7 +13,7 @@
</filter-mapping>
<servlet>
<servlet-name>AkkaServlet</servlet-name>
<servlet-class>se.scalablesolutions.akka.AkkaServlet</servlet-class>
<servlet-class>se.scalablesolutions.akka.rest.AkkaServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>AkkaServlet</servlet-name>

View file

@ -8,7 +8,7 @@
<servlet>
<servlet-name>AkkaServlet</servlet-name>
<servlet-class>se.scalablesolutions.akka.AkkaServlet</servlet-class>
<servlet-class>se.scalablesolutions.akka.rest.AkkaServlet</servlet-class>
</servlet>
<servlet-mapping>

View file

@ -0,0 +1,10 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.util
trait Bootable {
def onLoad : Unit = ()
def onUnload : Unit = ()
}

View file

@ -55,6 +55,7 @@
<module>akka-persistence-cassandra</module>
<module>akka-persistence-mongo</module>
<module>akka-rest</module>
<module>akka-comet</module>
<module>akka-amqp</module>
<module>akka-security</module>
<module>akka-kernel</module>