Merge branch 'Christmas_restructure'
This commit is contained in:
commit
600565773d
18 changed files with 429 additions and 148 deletions
89
akka-comet/pom.xml
Normal file
89
akka-comet/pom.xml
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>akka-comet</artifactId>
|
||||
<name>Akka Comet Module</name>
|
||||
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<artifactId>akka</artifactId>
|
||||
<groupId>se.scalablesolutions.akka</groupId>
|
||||
<version>0.6</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<!-- Core deps -->
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<artifactId>akka-core</artifactId>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<artifactId>akka-rest</artifactId>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For the Bootable -->
|
||||
<dependency>
|
||||
<groupId>com.sun.grizzly</groupId>
|
||||
<artifactId>grizzly-comet-webserver</artifactId>
|
||||
<version>${grizzly.version}</version>
|
||||
</dependency>
|
||||
<!--dependency>
|
||||
<groupId>com.sun.grizzly</groupId>
|
||||
<artifactId>grizzly-comet-webserver</artifactId>
|
||||
<version>${grizzly.version}</version>
|
||||
</dependency-->
|
||||
<!-- For Atmosphere -->
|
||||
<dependency>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
<version>2.5</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.atmosphere</groupId>
|
||||
<artifactId>atmosphere-annotations</artifactId>
|
||||
<version>${atmosphere.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.atmosphere</groupId>
|
||||
<artifactId>atmosphere-jersey</artifactId>
|
||||
<version>${atmosphere.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.atmosphere</groupId>
|
||||
<artifactId>atmosphere-runtime</artifactId>
|
||||
<version>${atmosphere.version}</version>
|
||||
</dependency>
|
||||
<!--dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
<version>${jersey.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
<version>${jersey.version}</version>
|
||||
</dependency-->
|
||||
<!--dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-json</artifactId>
|
||||
<version>${jersey.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.ws.rs</groupId>
|
||||
<artifactId>jsr311-api</artifactId>
|
||||
<version>1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey.contribs</groupId>
|
||||
<artifactId>jersey-scala</artifactId>
|
||||
<version>${jersey.version}</version>
|
||||
</dependency-->
|
||||
</dependencies>
|
||||
</project>
|
||||
48
akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala
Normal file
48
akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.comet
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor}
|
||||
import se.scalablesolutions.akka.remote.{Cluster}
|
||||
import org.atmosphere.cpr.{ClusterBroadcastFilter,Broadcaster}
|
||||
import scala.reflect.{BeanProperty}
|
||||
|
||||
sealed trait ClusterCometMessageType
|
||||
case class ClusterCometBroadcast(val name : String, val msg : AnyRef) extends ClusterCometMessageType
|
||||
|
||||
/**
|
||||
* Enables explicit clustering of Atmosphere (Comet) resources
|
||||
* Annotate the endpoint which has the @Broadcast annotation with
|
||||
* @org.atmosphere.annotation.Cluster(Array(classOf[AkkClusterBroadcastFilter])){ name = "someUniqueName" }
|
||||
* that's all folks!
|
||||
* Note: In the future, clustering comet will be transparent
|
||||
*/
|
||||
|
||||
class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] {
|
||||
@BeanProperty var clusterName = ""
|
||||
@BeanProperty var broadcaster : Broadcaster = null
|
||||
|
||||
override def init : Unit = ()
|
||||
|
||||
def destroy : Unit = stop
|
||||
|
||||
def filter(o : AnyRef) : AnyRef = o match {
|
||||
case ClusterCometBroadcast(_,m) => m //Do not re-broadcast, just unbox and pass along
|
||||
|
||||
case m : AnyRef => { //Relay message to the cluster and pass along
|
||||
Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],ClusterCometBroadcast(clusterName,m))
|
||||
m
|
||||
}
|
||||
}
|
||||
|
||||
def receive = {
|
||||
//Only handle messages intended for this particular instance
|
||||
case b@ClusterCometBroadcast(c,_) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b
|
||||
case _ =>
|
||||
}
|
||||
|
||||
//Since this class is instantiated by Atmosphere, we need to make sure it's started
|
||||
start
|
||||
}
|
||||
|
|
@ -2,48 +2,20 @@
|
|||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka
|
||||
package se.scalablesolutions.akka.comet
|
||||
|
||||
import se.scalablesolutions.akka.config.ConfiguratorRepository
|
||||
import se.scalablesolutions.akka.rest.ActorComponentProviderFactory
|
||||
import se.scalablesolutions.akka.rest.AkkaServlet
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import com.sun.jersey.api.core.ResourceConfig
|
||||
import com.sun.jersey.spi.container.servlet.ServletContainer
|
||||
import com.sun.jersey.spi.container.WebApplication
|
||||
|
||||
import java.util.{List => JList}
|
||||
|
||||
import javax.servlet.{ServletConfig}
|
||||
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
|
||||
|
||||
import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
|
||||
import org.atmosphere.container.{GrizzlyCometSupport}
|
||||
import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
|
||||
import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
|
||||
import org.atmosphere.jersey.JerseyBroadcaster
|
||||
|
||||
/**
|
||||
* Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container,
|
||||
* e.g. not using the Akka Kernel.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class AkkaServlet extends ServletContainer {
|
||||
import org.scala_tools.javautils.Imports._
|
||||
|
||||
override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = {
|
||||
Kernel.boot // will boot if not already booted by 'main'
|
||||
|
||||
val configurators = ConfiguratorRepository.getConfigurators
|
||||
|
||||
resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava)
|
||||
resourceConfig.getProperties.put(
|
||||
"com.sun.jersey.spi.container.ResourceFilters",
|
||||
Config.config.getList("akka.rest.filters").mkString(","))
|
||||
|
||||
webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a
|
||||
|
|
@ -74,24 +46,32 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We override this to avoid Atmosphere looking for it's atmosphere.xml file
|
||||
* Instead we specify what semantics we want in code.
|
||||
*/
|
||||
override def loadConfiguration(sc: ServletConfig) {
|
||||
config = new AtmosphereConfig { supportSession = false }
|
||||
atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster))
|
||||
}
|
||||
|
||||
override def createCometSupportResolver() : CometSupportResolver = {
|
||||
/**
|
||||
* This method is overridden because Akka Kernel is bundles with Grizzly, so if we deploy the Kernel in another container,
|
||||
* we need to handle that.
|
||||
*/
|
||||
override def createCometSupportResolver() : CometSupportResolver = {
|
||||
import org.scala_tools.javautils.Imports._
|
||||
|
||||
|
||||
new DefaultCometSupportResolver(config) {
|
||||
type CS = CometSupport[_ <: AtmosphereResource[_,_]]
|
||||
override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = {
|
||||
available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match {
|
||||
case Nil => new GrizzlyCometSupport(config)
|
||||
case Nil => new GrizzlyCometSupport(config)
|
||||
case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]])
|
||||
case _ => super.resolveMultipleNativeSupportConflict(available)
|
||||
case _ => super.resolveMultipleNativeSupportConflict(available)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = {
|
||||
val predef = config.getInitParameter("cometSupport")
|
||||
if(testClassExists(predef))
|
||||
72
akka-comet/src/main/scala/BootableCometActorService.scala
Normal file
72
akka-comet/src/main/scala/BootableCometActorService.scala
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.comet
|
||||
|
||||
import com.sun.grizzly.http.SelectorThread
|
||||
import com.sun.grizzly.http.servlet.ServletAdapter
|
||||
import com.sun.grizzly.standalone.StaticStreamAlgorithm
|
||||
|
||||
import javax.ws.rs.core.UriBuilder
|
||||
import se.scalablesolutions.akka.actor.BootableActorLoaderService
|
||||
import se.scalablesolutions.akka.util.{Bootable,Logging}
|
||||
|
||||
/**
|
||||
* Handles the Akka Comet Support (load/unload)
|
||||
*/
|
||||
trait BootableCometActorService extends Bootable with Logging {
|
||||
self : BootableActorLoaderService =>
|
||||
|
||||
import Config._
|
||||
|
||||
val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
|
||||
val REST_URL = "http://" + REST_HOSTNAME
|
||||
val REST_PORT = config.getInt("akka.rest.port", 9998)
|
||||
protected var jerseySelectorThread: Option[SelectorThread] = None
|
||||
|
||||
abstract override def onLoad = {
|
||||
super.onLoad
|
||||
if(config.getBool("akka.rest.service", true)){
|
||||
|
||||
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
|
||||
|
||||
val scheme = uri.getScheme
|
||||
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
|
||||
"The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
|
||||
|
||||
log.info("Attempting to start REST service on uri [%s]",uri)
|
||||
|
||||
val adapter = new ServletAdapter
|
||||
adapter.setHandleStaticResources(true)
|
||||
adapter.setServletInstance(new AkkaCometServlet)
|
||||
adapter.setContextPath(uri.getPath)
|
||||
//Using autodetection for now
|
||||
//adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
|
||||
if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
|
||||
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)
|
||||
|
||||
val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
|
||||
ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
|
||||
jerseySelectorThread = Some(new SelectorThread).map { t =>
|
||||
t.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
|
||||
t.setPort(REST_PORT)
|
||||
t.setAdapter(adapter)
|
||||
t.setEnableAsyncExecution(true)
|
||||
t.setAsyncHandler(ah)
|
||||
t.listen
|
||||
t }
|
||||
|
||||
log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def onUnload = {
|
||||
super.onUnload
|
||||
|
||||
if (jerseySelectorThread.isDefined) {
|
||||
log.info("Shutting down REST service (Jersey)")
|
||||
jerseySelectorThread.get.stopEndpoint
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.io.File
|
||||
import java.net.URLClassLoader
|
||||
import se.scalablesolutions.akka.util.{Bootable,Logging}
|
||||
|
||||
/**
|
||||
* Handles all modules in the deploy directory (load and unload)
|
||||
*/
|
||||
trait BootableActorLoaderService extends Bootable with Logging {
|
||||
import Config._
|
||||
|
||||
val BOOT_CLASSES = config.getList("akka.boot")
|
||||
var applicationLoader: Option[ClassLoader] = None
|
||||
|
||||
protected def runApplicationBootClasses : Option[ClassLoader] = {
|
||||
val loader =
|
||||
if (HOME.isDefined) {
|
||||
val CONFIG = HOME.get + "/config"
|
||||
val DEPLOY = HOME.get + "/deploy"
|
||||
val DEPLOY_DIR = new File(DEPLOY)
|
||||
if (!DEPLOY_DIR.exists) {
|
||||
log.error("Could not find a deploy directory at [%s]", DEPLOY)
|
||||
System.exit(-1)
|
||||
}
|
||||
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
|
||||
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
|
||||
new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
|
||||
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
|
||||
getClass.getClassLoader
|
||||
} else throw new IllegalStateException(
|
||||
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
|
||||
for (clazz <- BOOT_CLASSES) {
|
||||
log.info("Loading boot class [%s]", clazz)
|
||||
loader.loadClass(clazz).newInstance
|
||||
}
|
||||
|
||||
Some(loader)
|
||||
}
|
||||
|
||||
abstract override def onLoad = {
|
||||
applicationLoader = runApplicationBootClasses
|
||||
super.onLoad
|
||||
}
|
||||
|
||||
abstract override def onUnload = {
|
||||
ActorRegistry.shutdownAll
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.remote
|
||||
|
||||
import se.scalablesolutions.akka.actor.BootableActorLoaderService
|
||||
import se.scalablesolutions.akka.util.{Bootable,Logging}
|
||||
|
||||
/**
|
||||
* This bundle/service is responsible for booting up and shutting down the remote actors facility
|
||||
* It's used in Kernel
|
||||
*/
|
||||
|
||||
trait BootableRemoteActorService extends Bootable with Logging {
|
||||
self : BootableActorLoaderService =>
|
||||
|
||||
import Config._
|
||||
|
||||
protected lazy val remoteServerThread = new Thread(new Runnable() {
|
||||
def run = RemoteNode.start(self.applicationLoader)
|
||||
}, "Akka Remote Service")
|
||||
|
||||
def startRemoteService = remoteServerThread.start
|
||||
|
||||
abstract override def onLoad = {
|
||||
super.onLoad //Make sure the actors facility is loaded before we load the remote service
|
||||
if(config.getBool("akka.remote.server.service", true)){
|
||||
log.info("Initializing Remote Actors Service...")
|
||||
startRemoteService
|
||||
log.info("Remote Actors Service initialized!")
|
||||
}
|
||||
}
|
||||
|
||||
abstract override def onUnload = {
|
||||
super.onUnload
|
||||
|
||||
if (remoteServerThread.isAlive) {
|
||||
log.info("Shutting down Remote Actors Service")
|
||||
RemoteNode.shutdown
|
||||
remoteServerThread.join(1000)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -151,7 +151,7 @@ class JGroupsClusterActor extends ClusterActor {
|
|||
|
||||
def receive = {
|
||||
case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
|
||||
log debug ("Zombie: %s", x)
|
||||
log debug ("Killing Zombie Node: %s", x)
|
||||
broadcast(x :: Nil, PapersPlease)
|
||||
remotes = remotes - x
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ public class PersistenceManager {
|
|||
private static volatile boolean isRunning = false;
|
||||
public static void init() {
|
||||
if (!isRunning) {
|
||||
se.scalablesolutions.akka.Kernel.startRemoteService();
|
||||
se.scalablesolutions.akka.Kernel$.MODULE$.startRemoteService();
|
||||
isRunning = true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ public class RestTest extends TestCase {
|
|||
*/
|
||||
private static SelectorThread startJersey() {
|
||||
try {
|
||||
Servlet servlet = new se.scalablesolutions.akka.AkkaServlet();
|
||||
Servlet servlet = new se.scalablesolutions.akka.rest.AkkaServlet();
|
||||
ServletAdapter adapter = new ServletAdapter();
|
||||
adapter.setServletInstance(servlet);
|
||||
adapter.setContextPath(URI.getPath());
|
||||
|
|
|
|||
|
|
@ -47,8 +47,18 @@
|
|||
<groupId>${project.groupId}</groupId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<artifactId>akka-comet</artifactId>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Atmosphere -->
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-server</artifactId>
|
||||
<version>${jersey.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.atmosphere</groupId>
|
||||
<artifactId>atmosphere-annotations</artifactId>
|
||||
|
|
@ -88,7 +98,7 @@
|
|||
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ComponentsXmlResourceTransformer"/>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>se.scalablesolutions.akka.Kernel</mainClass>
|
||||
<mainClass>se.scalablesolutions.akka.Main</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
</configuration>
|
||||
|
|
|
|||
|
|
@ -4,17 +4,16 @@
|
|||
|
||||
package se.scalablesolutions.akka
|
||||
|
||||
import com.sun.grizzly.http.SelectorThread
|
||||
import com.sun.grizzly.http.servlet.ServletAdapter
|
||||
import com.sun.grizzly.standalone.StaticStreamAlgorithm
|
||||
|
||||
import javax.ws.rs.core.UriBuilder
|
||||
import java.io.File
|
||||
import java.net.URLClassLoader
|
||||
|
||||
import se.scalablesolutions.akka.remote.RemoteNode
|
||||
import se.scalablesolutions.akka.comet.BootableCometActorService
|
||||
import se.scalablesolutions.akka.remote.{RemoteNode,BootableRemoteActorService}
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry
|
||||
import se.scalablesolutions.akka.actor.{ActorRegistry,BootableActorLoaderService}
|
||||
|
||||
import javax.servlet.{ServletContextListener,ServletContextEvent}
|
||||
|
||||
object Main {
|
||||
def main(args: Array[String]) = Kernel.boot
|
||||
}
|
||||
|
||||
/**
|
||||
* The Akka Kernel.
|
||||
|
|
@ -22,26 +21,13 @@ import se.scalablesolutions.akka.actor.ActorRegistry
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Kernel extends Logging {
|
||||
import Config._
|
||||
|
||||
val BOOT_CLASSES = config.getList("akka.boot")
|
||||
val RUN_REMOTE_SERVICE = config.getBool("akka.remote.server.service", true)
|
||||
val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
|
||||
val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
|
||||
val REST_URL = "http://" + REST_HOSTNAME
|
||||
val REST_PORT = config.getInt("akka.rest.port", 9998)
|
||||
|
||||
// FIXME add API to shut server down gracefully
|
||||
@volatile private var hasBooted = false
|
||||
private var jerseySelectorThread: Option[SelectorThread] = None
|
||||
|
||||
private val startTime = System.currentTimeMillis
|
||||
private var applicationLoader: Option[ClassLoader] = None
|
||||
|
||||
private lazy val remoteServerThread = new Thread(new Runnable() {
|
||||
def run = RemoteNode.start(applicationLoader)
|
||||
}, "Akka Remote Service")
|
||||
|
||||
def main(args: Array[String]) = boot
|
||||
|
||||
//Bundles is what modules are to be loaded with the Kernel, this uses Jonas' AOP style mixin pattern
|
||||
object Bundles extends BootableActorLoaderService with BootableRemoteActorService with BootableCometActorService
|
||||
|
||||
/**
|
||||
* Boots up the Kernel.
|
||||
|
|
@ -56,89 +42,26 @@ object Kernel extends Logging {
|
|||
if (!hasBooted) {
|
||||
if (withBanner) printBanner
|
||||
log.info("Starting Akka...")
|
||||
|
||||
runApplicationBootClasses
|
||||
if (RUN_REMOTE_SERVICE) startRemoteService
|
||||
if (RUN_REST_SERVICE) startREST
|
||||
|
||||
Bundles.onLoad
|
||||
Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
|
||||
log.info("Akka started successfully")
|
||||
hasBooted = true
|
||||
}
|
||||
}
|
||||
|
||||
// TODO document Kernel.shutdown
|
||||
/**
|
||||
* Shuts down the kernel, unloads all of the bundles
|
||||
*/
|
||||
def shutdown = synchronized {
|
||||
if (hasBooted) {
|
||||
log.info("Shutting down Akka...")
|
||||
ActorRegistry.shutdownAll
|
||||
if (jerseySelectorThread.isDefined) {
|
||||
log.info("Shutting down REST service (Jersey)")
|
||||
jerseySelectorThread.get.stopEndpoint
|
||||
}
|
||||
if (remoteServerThread.isAlive) {
|
||||
log.info("Shutting down remote service")
|
||||
RemoteNode.shutdown
|
||||
remoteServerThread.join(1000)
|
||||
}
|
||||
Bundles.onUnload
|
||||
log.info("Akka succesfully shut down")
|
||||
}
|
||||
}
|
||||
|
||||
def startRemoteService = remoteServerThread.start
|
||||
|
||||
def startREST = {
|
||||
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
|
||||
|
||||
val scheme = uri.getScheme
|
||||
if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
|
||||
"The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
|
||||
|
||||
val adapter = new ServletAdapter
|
||||
adapter.setHandleStaticResources(true)
|
||||
adapter.setServletInstance(new AkkaCometServlet)
|
||||
adapter.setContextPath(uri.getPath)
|
||||
//Using autodetection for now
|
||||
adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
|
||||
if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
|
||||
log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)
|
||||
|
||||
val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
|
||||
ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
|
||||
jerseySelectorThread = Some(new SelectorThread)
|
||||
jerseySelectorThread.get.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
|
||||
jerseySelectorThread.get.setPort(REST_PORT)
|
||||
jerseySelectorThread.get.setAdapter(adapter)
|
||||
jerseySelectorThread.get.setEnableAsyncExecution(true)
|
||||
jerseySelectorThread.get.setAsyncHandler(ah)
|
||||
jerseySelectorThread.get.listen
|
||||
|
||||
log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
|
||||
}
|
||||
|
||||
private def runApplicationBootClasses = {
|
||||
val loader =
|
||||
if (HOME.isDefined) {
|
||||
val CONFIG = HOME.get + "/config"
|
||||
val DEPLOY = HOME.get + "/deploy"
|
||||
val DEPLOY_DIR = new File(DEPLOY)
|
||||
if (!DEPLOY_DIR.exists) {
|
||||
log.error("Could not find a deploy directory at [%s]", DEPLOY)
|
||||
System.exit(-1)
|
||||
}
|
||||
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
|
||||
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
|
||||
new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
|
||||
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
|
||||
getClass.getClassLoader
|
||||
} else throw new IllegalStateException(
|
||||
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
|
||||
for (clazz <- BOOT_CLASSES) {
|
||||
log.info("Loading boot class [%s]", clazz)
|
||||
loader.loadClass(clazz).newInstance
|
||||
}
|
||||
applicationLoader = Some(loader)
|
||||
}
|
||||
//FIXME This is only being called by a test
|
||||
def startRemoteService = Bundles.startRemoteService
|
||||
|
||||
private def printBanner = {
|
||||
log.info(
|
||||
|
|
@ -151,7 +74,16 @@ object Kernel extends Logging {
|
|||
(____ /__|_ \__|_ \(____ /
|
||||
\/ \/ \/ \/
|
||||
""")
|
||||
log.info(" Running version %s", VERSION)
|
||||
log.info(" Running version %s", Config.VERSION)
|
||||
log.info("==============================")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
And this one can be added to web.xml mappings as a listener to boot and shutdown Akka
|
||||
*/
|
||||
|
||||
class Kernel extends ServletContextListener {
|
||||
def contextDestroyed(e : ServletContextEvent) : Unit = Kernel.shutdown
|
||||
def contextInitialized(e : ServletContextEvent) : Unit = Kernel.boot
|
||||
}
|
||||
|
|
@ -23,11 +23,16 @@
|
|||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Jersey & Atmosphere -->
|
||||
<!-- For Jersey -->
|
||||
<dependency>
|
||||
<groupId>com.sun.grizzly</groupId>
|
||||
<artifactId>grizzly-comet-webserver</artifactId>
|
||||
<version>${grizzly.version}</version>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>servlet-api</artifactId>
|
||||
<version>2.5</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-core</artifactId>
|
||||
<version>${jersey.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
|
|
|
|||
34
akka-rest/src/main/scala/AkkaServlet.scala
Executable file
34
akka-rest/src/main/scala/AkkaServlet.scala
Executable file
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.rest
|
||||
|
||||
import se.scalablesolutions.akka.config.ConfiguratorRepository
|
||||
|
||||
import com.sun.jersey.api.core.ResourceConfig
|
||||
import com.sun.jersey.spi.container.servlet.ServletContainer
|
||||
import com.sun.jersey.spi.container.WebApplication
|
||||
|
||||
/**
|
||||
* Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container,
|
||||
* e.g. not using the Akka Kernel.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class AkkaServlet extends ServletContainer {
|
||||
import org.scala_tools.javautils.Imports._
|
||||
|
||||
override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = {
|
||||
//Kernel.boot // will boot if not already booted by 'main'
|
||||
|
||||
val configurators = ConfiguratorRepository.getConfigurators
|
||||
|
||||
resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava)
|
||||
resourceConfig.getProperties.put(
|
||||
"com.sun.jersey.spi.container.ResourceFilters",
|
||||
Config.config.getList("akka.rest.filters").mkString(","))
|
||||
|
||||
webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
|
||||
}
|
||||
}
|
||||
|
|
@ -13,7 +13,7 @@
|
|||
</filter-mapping>
|
||||
<servlet>
|
||||
<servlet-name>AkkaServlet</servlet-name>
|
||||
<servlet-class>se.scalablesolutions.akka.AkkaServlet</servlet-class>
|
||||
<servlet-class>se.scalablesolutions.akka.rest.AkkaServlet</servlet-class>
|
||||
</servlet>
|
||||
<servlet-mapping>
|
||||
<servlet-name>AkkaServlet</servlet-name>
|
||||
|
|
|
|||
|
|
@ -8,13 +8,14 @@ import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
|
|||
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.comet.{AkkaClusterBroadcastFilter}
|
||||
|
||||
import java.lang.Integer
|
||||
import java.nio.ByteBuffer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
|
||||
|
||||
import org.atmosphere.annotation.{Broadcast, Suspend}
|
||||
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
|
||||
import org.atmosphere.util.XSSHtmlFilter
|
||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
import org.atmosphere.jersey.Broadcastable
|
||||
|
|
@ -85,6 +86,7 @@ class PubSub extends Actor {
|
|||
@Broadcast
|
||||
@Path("/topic/{topic}/{message}/")
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
@Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "foo" }
|
||||
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
|
||||
|
||||
def receive = { case _ => }
|
||||
|
|
@ -149,9 +151,10 @@ class Chat extends Actor {
|
|||
case x => log.info("recieve unknown: " + x)
|
||||
}
|
||||
|
||||
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
|
||||
@Consumes(Array("application/x-www-form-urlencoded"))
|
||||
@POST
|
||||
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
|
||||
@Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "bar" }
|
||||
@Consumes(Array("application/x-www-form-urlencoded"))
|
||||
@Produces(Array("text/html"))
|
||||
def publishMessage(form: MultivaluedMap[String, String]) =
|
||||
(this !! Chat(form.getFirst("name"),
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
<servlet>
|
||||
<servlet-name>AkkaServlet</servlet-name>
|
||||
<servlet-class>se.scalablesolutions.akka.AkkaServlet</servlet-class>
|
||||
<servlet-class>se.scalablesolutions.akka.rest.AkkaServlet</servlet-class>
|
||||
</servlet>
|
||||
|
||||
<servlet-mapping>
|
||||
|
|
|
|||
10
akka-util/src/main/scala/Bootable.scala
Normal file
10
akka-util/src/main/scala/Bootable.scala
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.util
|
||||
|
||||
trait Bootable {
|
||||
def onLoad : Unit = ()
|
||||
def onUnload : Unit = ()
|
||||
}
|
||||
1
pom.xml
1
pom.xml
|
|
@ -55,6 +55,7 @@
|
|||
<module>akka-persistence-cassandra</module>
|
||||
<module>akka-persistence-mongo</module>
|
||||
<module>akka-rest</module>
|
||||
<module>akka-comet</module>
|
||||
<module>akka-amqp</module>
|
||||
<module>akka-security</module>
|
||||
<module>akka-kernel</module>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue