diff --git a/akka-comet/pom.xml b/akka-comet/pom.xml
new file mode 100644
index 0000000000..778940d242
--- /dev/null
+++ b/akka-comet/pom.xml
@@ -0,0 +1,89 @@
+
+ 4.0.0
+
+ akka-comet
+ Akka Comet Module
+
+ jar
+
+
+ akka
+ se.scalablesolutions.akka
+ 0.6
+ ../pom.xml
+
+
+
+
+
+ akka-core
+ ${project.groupId}
+ ${project.version}
+
+
+ akka-rest
+ ${project.groupId}
+ ${project.version}
+
+
+
+
+ com.sun.grizzly
+ grizzly-comet-webserver
+ ${grizzly.version}
+
+
+
+
+ javax.servlet
+ servlet-api
+ 2.5
+
+
+ org.atmosphere
+ atmosphere-annotations
+ ${atmosphere.version}
+
+
+ org.atmosphere
+ atmosphere-jersey
+ ${atmosphere.version}
+
+
+ org.atmosphere
+ atmosphere-runtime
+ ${atmosphere.version}
+
+
+
+
+
diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala
new file mode 100644
index 0000000000..5c98501978
--- /dev/null
+++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.comet
+
+import se.scalablesolutions.akka.actor.{Actor}
+import se.scalablesolutions.akka.remote.{Cluster}
+import org.atmosphere.cpr.{ClusterBroadcastFilter,Broadcaster}
+import scala.reflect.{BeanProperty}
+
+sealed trait ClusterCometMessageType
+case class ClusterCometBroadcast(val name : String, val msg : AnyRef) extends ClusterCometMessageType
+
+/**
+ * Enables explicit clustering of Atmosphere (Comet) resources
+ * Annotate the endpoint which has the @Broadcast annotation with
+ * @org.atmosphere.annotation.Cluster(Array(classOf[AkkClusterBroadcastFilter])){ name = "someUniqueName" }
+ * that's all folks!
+ * Note: In the future, clustering comet will be transparent
+ */
+
+class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] {
+ @BeanProperty var clusterName = ""
+ @BeanProperty var broadcaster : Broadcaster = null
+
+ override def init : Unit = ()
+
+ def destroy : Unit = stop
+
+ def filter(o : AnyRef) : AnyRef = o match {
+ case ClusterCometBroadcast(_,m) => m //Do not re-broadcast, just unbox and pass along
+
+ case m : AnyRef => { //Relay message to the cluster and pass along
+ Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],ClusterCometBroadcast(clusterName,m))
+ m
+ }
+ }
+
+ def receive = {
+ //Only handle messages intended for this particular instance
+ case b@ClusterCometBroadcast(c,_) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b
+ case _ =>
+ }
+
+ //Since this class is instantiated by Atmosphere, we need to make sure it's started
+ start
+}
\ No newline at end of file
diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-comet/src/main/scala/AkkaCometServlet.scala
similarity index 66%
rename from akka-kernel/src/main/scala/AkkaServlet.scala
rename to akka-comet/src/main/scala/AkkaCometServlet.scala
index 8c9fdba4c2..9b8fa06306 100755
--- a/akka-kernel/src/main/scala/AkkaServlet.scala
+++ b/akka-comet/src/main/scala/AkkaCometServlet.scala
@@ -2,48 +2,20 @@
* Copyright (C) 2009 Scalable Solutions.
*/
-package se.scalablesolutions.akka
+package se.scalablesolutions.akka.comet
-import se.scalablesolutions.akka.config.ConfiguratorRepository
-import se.scalablesolutions.akka.rest.ActorComponentProviderFactory
+import se.scalablesolutions.akka.rest.AkkaServlet
import se.scalablesolutions.akka.util.Logging
-import com.sun.jersey.api.core.ResourceConfig
-import com.sun.jersey.spi.container.servlet.ServletContainer
-import com.sun.jersey.spi.container.WebApplication
-
import java.util.{List => JList}
import javax.servlet.{ServletConfig}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
-
-import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
import org.atmosphere.container.{GrizzlyCometSupport}
+import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
import org.atmosphere.jersey.JerseyBroadcaster
-/**
- * Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container,
- * e.g. not using the Akka Kernel.
- *
- * @author Jonas Bonér
- */
-class AkkaServlet extends ServletContainer {
- import org.scala_tools.javautils.Imports._
-
- override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = {
- Kernel.boot // will boot if not already booted by 'main'
-
- val configurators = ConfiguratorRepository.getConfigurators
-
- resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava)
- resourceConfig.getProperties.put(
- "com.sun.jersey.spi.container.ResourceFilters",
- Config.config.getList("akka.rest.filters").mkString(","))
-
- webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
- }
-}
/**
* Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a
@@ -74,24 +46,32 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging
}
}
+ /**
+ * We override this to avoid Atmosphere looking for it's atmosphere.xml file
+ * Instead we specify what semantics we want in code.
+ */
override def loadConfiguration(sc: ServletConfig) {
config = new AtmosphereConfig { supportSession = false }
atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster))
}
- override def createCometSupportResolver() : CometSupportResolver = {
+ /**
+ * This method is overridden because Akka Kernel is bundles with Grizzly, so if we deploy the Kernel in another container,
+ * we need to handle that.
+ */
+ override def createCometSupportResolver() : CometSupportResolver = {
import org.scala_tools.javautils.Imports._
-
+
new DefaultCometSupportResolver(config) {
type CS = CometSupport[_ <: AtmosphereResource[_,_]]
override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = {
available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match {
- case Nil => new GrizzlyCometSupport(config)
+ case Nil => new GrizzlyCometSupport(config)
case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]])
- case _ => super.resolveMultipleNativeSupportConflict(available)
+ case _ => super.resolveMultipleNativeSupportConflict(available)
}
}
-
+
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = {
val predef = config.getInitParameter("cometSupport")
if(testClassExists(predef))
diff --git a/akka-comet/src/main/scala/BootableCometActorService.scala b/akka-comet/src/main/scala/BootableCometActorService.scala
new file mode 100644
index 0000000000..67d85ea1cc
--- /dev/null
+++ b/akka-comet/src/main/scala/BootableCometActorService.scala
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.comet
+
+import com.sun.grizzly.http.SelectorThread
+import com.sun.grizzly.http.servlet.ServletAdapter
+import com.sun.grizzly.standalone.StaticStreamAlgorithm
+
+import javax.ws.rs.core.UriBuilder
+import se.scalablesolutions.akka.actor.BootableActorLoaderService
+import se.scalablesolutions.akka.util.{Bootable,Logging}
+
+/**
+ * Handles the Akka Comet Support (load/unload)
+ */
+trait BootableCometActorService extends Bootable with Logging {
+ self : BootableActorLoaderService =>
+
+ import Config._
+
+ val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
+ val REST_URL = "http://" + REST_HOSTNAME
+ val REST_PORT = config.getInt("akka.rest.port", 9998)
+ protected var jerseySelectorThread: Option[SelectorThread] = None
+
+ abstract override def onLoad = {
+ super.onLoad
+ if(config.getBool("akka.rest.service", true)){
+
+ val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
+
+ val scheme = uri.getScheme
+ if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
+ "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
+
+ log.info("Attempting to start REST service on uri [%s]",uri)
+
+ val adapter = new ServletAdapter
+ adapter.setHandleStaticResources(true)
+ adapter.setServletInstance(new AkkaCometServlet)
+ adapter.setContextPath(uri.getPath)
+ //Using autodetection for now
+ //adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
+ if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
+ log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)
+
+ val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
+ ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
+ jerseySelectorThread = Some(new SelectorThread).map { t =>
+ t.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
+ t.setPort(REST_PORT)
+ t.setAdapter(adapter)
+ t.setEnableAsyncExecution(true)
+ t.setAsyncHandler(ah)
+ t.listen
+ t }
+
+ log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
+ }
+ }
+
+ abstract override def onUnload = {
+ super.onUnload
+
+ if (jerseySelectorThread.isDefined) {
+ log.info("Shutting down REST service (Jersey)")
+ jerseySelectorThread.get.stopEndpoint
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
new file mode 100644
index 0000000000..ef6d8f7eab
--- /dev/null
+++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala
@@ -0,0 +1,53 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.actor
+
+import java.io.File
+import java.net.URLClassLoader
+import se.scalablesolutions.akka.util.{Bootable,Logging}
+
+/**
+ * Handles all modules in the deploy directory (load and unload)
+ */
+trait BootableActorLoaderService extends Bootable with Logging {
+ import Config._
+
+ val BOOT_CLASSES = config.getList("akka.boot")
+ var applicationLoader: Option[ClassLoader] = None
+
+ protected def runApplicationBootClasses : Option[ClassLoader] = {
+ val loader =
+ if (HOME.isDefined) {
+ val CONFIG = HOME.get + "/config"
+ val DEPLOY = HOME.get + "/deploy"
+ val DEPLOY_DIR = new File(DEPLOY)
+ if (!DEPLOY_DIR.exists) {
+ log.error("Could not find a deploy directory at [%s]", DEPLOY)
+ System.exit(-1)
+ }
+ val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
+ log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
+ new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
+ } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
+ getClass.getClassLoader
+ } else throw new IllegalStateException(
+ "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
+ for (clazz <- BOOT_CLASSES) {
+ log.info("Loading boot class [%s]", clazz)
+ loader.loadClass(clazz).newInstance
+ }
+
+ Some(loader)
+ }
+
+ abstract override def onLoad = {
+ applicationLoader = runApplicationBootClasses
+ super.onLoad
+ }
+
+ abstract override def onUnload = {
+ ActorRegistry.shutdownAll
+ }
+}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala
new file mode 100644
index 0000000000..38ed31651e
--- /dev/null
+++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.remote
+
+import se.scalablesolutions.akka.actor.BootableActorLoaderService
+import se.scalablesolutions.akka.util.{Bootable,Logging}
+
+/**
+ * This bundle/service is responsible for booting up and shutting down the remote actors facility
+ * It's used in Kernel
+ */
+
+trait BootableRemoteActorService extends Bootable with Logging {
+ self : BootableActorLoaderService =>
+
+ import Config._
+
+ protected lazy val remoteServerThread = new Thread(new Runnable() {
+ def run = RemoteNode.start(self.applicationLoader)
+ }, "Akka Remote Service")
+
+ def startRemoteService = remoteServerThread.start
+
+ abstract override def onLoad = {
+ super.onLoad //Make sure the actors facility is loaded before we load the remote service
+ if(config.getBool("akka.remote.server.service", true)){
+ log.info("Initializing Remote Actors Service...")
+ startRemoteService
+ log.info("Remote Actors Service initialized!")
+ }
+ }
+
+ abstract override def onUnload = {
+ super.onUnload
+
+ if (remoteServerThread.isAlive) {
+ log.info("Shutting down Remote Actors Service")
+ RemoteNode.shutdown
+ remoteServerThread.join(1000)
+ }
+ }
+}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala
index 03708069f4..9e06d6d267 100644
--- a/akka-core/src/main/scala/remote/Cluster.scala
+++ b/akka-core/src/main/scala/remote/Cluster.scala
@@ -151,7 +151,7 @@ class JGroupsClusterActor extends ClusterActor {
def receive = {
case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
- log debug ("Zombie: %s", x)
+ log debug ("Killing Zombie Node: %s", x)
broadcast(x :: Nil, PapersPlease)
remotes = remotes - x
}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
index 49cc8c0d0a..cd856b64df 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java
@@ -4,7 +4,7 @@ public class PersistenceManager {
private static volatile boolean isRunning = false;
public static void init() {
if (!isRunning) {
- se.scalablesolutions.akka.Kernel.startRemoteService();
+ se.scalablesolutions.akka.Kernel$.MODULE$.startRemoteService();
isRunning = true;
}
}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
index 4eb17ce7e6..bae492d0e1 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
@@ -60,7 +60,7 @@ public class RestTest extends TestCase {
*/
private static SelectorThread startJersey() {
try {
- Servlet servlet = new se.scalablesolutions.akka.AkkaServlet();
+ Servlet servlet = new se.scalablesolutions.akka.rest.AkkaServlet();
ServletAdapter adapter = new ServletAdapter();
adapter.setServletInstance(servlet);
adapter.setContextPath(URI.getPath());
diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml
index c7d0c3134e..c32504f5a1 100755
--- a/akka-kernel/pom.xml
+++ b/akka-kernel/pom.xml
@@ -47,8 +47,18 @@
${project.groupId}
${project.version}
+
+ akka-comet
+ ${project.groupId}
+ ${project.version}
+
+
+ com.sun.jersey
+ jersey-server
+ ${jersey.version}
+
org.atmosphere
atmosphere-annotations
@@ -88,7 +98,7 @@
- se.scalablesolutions.akka.Kernel
+ se.scalablesolutions.akka.Main
diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala
index 8d684c4619..86f6226c5a 100644
--- a/akka-kernel/src/main/scala/Kernel.scala
+++ b/akka-kernel/src/main/scala/Kernel.scala
@@ -4,17 +4,16 @@
package se.scalablesolutions.akka
-import com.sun.grizzly.http.SelectorThread
-import com.sun.grizzly.http.servlet.ServletAdapter
-import com.sun.grizzly.standalone.StaticStreamAlgorithm
-
-import javax.ws.rs.core.UriBuilder
-import java.io.File
-import java.net.URLClassLoader
-
-import se.scalablesolutions.akka.remote.RemoteNode
+import se.scalablesolutions.akka.comet.BootableCometActorService
+import se.scalablesolutions.akka.remote.{RemoteNode,BootableRemoteActorService}
import se.scalablesolutions.akka.util.Logging
-import se.scalablesolutions.akka.actor.ActorRegistry
+import se.scalablesolutions.akka.actor.{ActorRegistry,BootableActorLoaderService}
+
+import javax.servlet.{ServletContextListener,ServletContextEvent}
+
+object Main {
+ def main(args: Array[String]) = Kernel.boot
+}
/**
* The Akka Kernel.
@@ -22,26 +21,13 @@ import se.scalablesolutions.akka.actor.ActorRegistry
* @author Jonas Bonér
*/
object Kernel extends Logging {
- import Config._
-
- val BOOT_CLASSES = config.getList("akka.boot")
- val RUN_REMOTE_SERVICE = config.getBool("akka.remote.server.service", true)
- val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
- val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
- val REST_URL = "http://" + REST_HOSTNAME
- val REST_PORT = config.getInt("akka.rest.port", 9998)
-
// FIXME add API to shut server down gracefully
@volatile private var hasBooted = false
- private var jerseySelectorThread: Option[SelectorThread] = None
+
private val startTime = System.currentTimeMillis
- private var applicationLoader: Option[ClassLoader] = None
-
- private lazy val remoteServerThread = new Thread(new Runnable() {
- def run = RemoteNode.start(applicationLoader)
- }, "Akka Remote Service")
-
- def main(args: Array[String]) = boot
+
+ //Bundles is what modules are to be loaded with the Kernel, this uses Jonas' AOP style mixin pattern
+ object Bundles extends BootableActorLoaderService with BootableRemoteActorService with BootableCometActorService
/**
* Boots up the Kernel.
@@ -56,89 +42,26 @@ object Kernel extends Logging {
if (!hasBooted) {
if (withBanner) printBanner
log.info("Starting Akka...")
-
- runApplicationBootClasses
- if (RUN_REMOTE_SERVICE) startRemoteService
- if (RUN_REST_SERVICE) startREST
-
+ Bundles.onLoad
Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
log.info("Akka started successfully")
hasBooted = true
}
}
- // TODO document Kernel.shutdown
+ /**
+ * Shuts down the kernel, unloads all of the bundles
+ */
def shutdown = synchronized {
if (hasBooted) {
log.info("Shutting down Akka...")
- ActorRegistry.shutdownAll
- if (jerseySelectorThread.isDefined) {
- log.info("Shutting down REST service (Jersey)")
- jerseySelectorThread.get.stopEndpoint
- }
- if (remoteServerThread.isAlive) {
- log.info("Shutting down remote service")
- RemoteNode.shutdown
- remoteServerThread.join(1000)
- }
+ Bundles.onUnload
log.info("Akka succesfully shut down")
}
}
- def startRemoteService = remoteServerThread.start
-
- def startREST = {
- val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
-
- val scheme = uri.getScheme
- if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException(
- "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'")
-
- val adapter = new ServletAdapter
- adapter.setHandleStaticResources(true)
- adapter.setServletInstance(new AkkaCometServlet)
- adapter.setContextPath(uri.getPath)
- //Using autodetection for now
- adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport")
- if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root")
- log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath)
-
- val ah = new com.sun.grizzly.arp.DefaultAsyncHandler
- ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter)
- jerseySelectorThread = Some(new SelectorThread)
- jerseySelectorThread.get.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName)
- jerseySelectorThread.get.setPort(REST_PORT)
- jerseySelectorThread.get.setAdapter(adapter)
- jerseySelectorThread.get.setEnableAsyncExecution(true)
- jerseySelectorThread.get.setAsyncHandler(ah)
- jerseySelectorThread.get.listen
-
- log.info("REST service started successfully. Listening to port [%s]", REST_PORT)
- }
-
- private def runApplicationBootClasses = {
- val loader =
- if (HOME.isDefined) {
- val CONFIG = HOME.get + "/config"
- val DEPLOY = HOME.get + "/deploy"
- val DEPLOY_DIR = new File(DEPLOY)
- if (!DEPLOY_DIR.exists) {
- log.error("Could not find a deploy directory at [%s]", DEPLOY)
- System.exit(-1)
- }
- val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
- log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
- new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
- } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
- getClass.getClassLoader
- } else throw new IllegalStateException(
- "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
- for (clazz <- BOOT_CLASSES) {
- log.info("Loading boot class [%s]", clazz)
- loader.loadClass(clazz).newInstance
- }
- applicationLoader = Some(loader)
- }
+ //FIXME This is only being called by a test
+ def startRemoteService = Bundles.startRemoteService
private def printBanner = {
log.info(
@@ -151,7 +74,16 @@ object Kernel extends Logging {
(____ /__|_ \__|_ \(____ /
\/ \/ \/ \/
""")
- log.info(" Running version %s", VERSION)
+ log.info(" Running version %s", Config.VERSION)
log.info("==============================")
}
-}
\ No newline at end of file
+}
+
+ /*
+ And this one can be added to web.xml mappings as a listener to boot and shutdown Akka
+ */
+
+class Kernel extends ServletContextListener {
+ def contextDestroyed(e : ServletContextEvent) : Unit = Kernel.shutdown
+ def contextInitialized(e : ServletContextEvent) : Unit = Kernel.boot
+ }
\ No newline at end of file
diff --git a/akka-rest/pom.xml b/akka-rest/pom.xml
index 32fbad0997..c354831cd4 100644
--- a/akka-rest/pom.xml
+++ b/akka-rest/pom.xml
@@ -23,11 +23,16 @@
${project.version}
-
+
- com.sun.grizzly
- grizzly-comet-webserver
- ${grizzly.version}
+ javax.servlet
+ servlet-api
+ 2.5
+
+
+ com.sun.jersey
+ jersey-core
+ ${jersey.version}
com.sun.jersey
diff --git a/akka-rest/src/main/scala/AkkaServlet.scala b/akka-rest/src/main/scala/AkkaServlet.scala
new file mode 100755
index 0000000000..4661ab5c14
--- /dev/null
+++ b/akka-rest/src/main/scala/AkkaServlet.scala
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.rest
+
+import se.scalablesolutions.akka.config.ConfiguratorRepository
+
+import com.sun.jersey.api.core.ResourceConfig
+import com.sun.jersey.spi.container.servlet.ServletContainer
+import com.sun.jersey.spi.container.WebApplication
+
+/**
+ * Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container,
+ * e.g. not using the Akka Kernel.
+ *
+ * @author Jonas Bonér
+ */
+class AkkaServlet extends ServletContainer {
+ import org.scala_tools.javautils.Imports._
+
+ override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = {
+ //Kernel.boot // will boot if not already booted by 'main'
+
+ val configurators = ConfiguratorRepository.getConfigurators
+
+ resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava)
+ resourceConfig.getProperties.put(
+ "com.sun.jersey.spi.container.ResourceFilters",
+ Config.config.getList("akka.rest.filters").mkString(","))
+
+ webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators))
+ }
+}
\ No newline at end of file
diff --git a/akka-samples-lift/src/main/webapp/WEB-INF/web.xml b/akka-samples-lift/src/main/webapp/WEB-INF/web.xml
index c95dc59b2c..23348604bb 100755
--- a/akka-samples-lift/src/main/webapp/WEB-INF/web.xml
+++ b/akka-samples-lift/src/main/webapp/WEB-INF/web.xml
@@ -13,7 +13,7 @@
AkkaServlet
- se.scalablesolutions.akka.AkkaServlet
+ se.scalablesolutions.akka.rest.AkkaServlet
AkkaServlet
diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala
index f657b502db..1833549a77 100644
--- a/akka-samples-scala/src/main/scala/SimpleService.scala
+++ b/akka-samples-scala/src/main/scala/SimpleService.scala
@@ -8,13 +8,14 @@ import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
+import se.scalablesolutions.akka.comet.{AkkaClusterBroadcastFilter}
import java.lang.Integer
import java.nio.ByteBuffer
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
-import org.atmosphere.annotation.{Broadcast, Suspend}
+import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
import org.atmosphere.util.XSSHtmlFilter
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
import org.atmosphere.jersey.Broadcastable
@@ -85,6 +86,7 @@ class PubSub extends Actor {
@Broadcast
@Path("/topic/{topic}/{message}/")
@Produces(Array("text/plain;charset=ISO-8859-1"))
+ @Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "foo" }
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
def receive = { case _ => }
@@ -149,9 +151,10 @@ class Chat extends Actor {
case x => log.info("recieve unknown: " + x)
}
- @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
- @Consumes(Array("application/x-www-form-urlencoded"))
@POST
+ @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
+ @Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "bar" }
+ @Consumes(Array("application/x-www-form-urlencoded"))
@Produces(Array("text/html"))
def publishMessage(form: MultivaluedMap[String, String]) =
(this !! Chat(form.getFirst("name"),
diff --git a/akka-samples-security/src/main/webapp/WEB-INF/web.xml b/akka-samples-security/src/main/webapp/WEB-INF/web.xml
index d0dfea26a8..02a6f7d918 100644
--- a/akka-samples-security/src/main/webapp/WEB-INF/web.xml
+++ b/akka-samples-security/src/main/webapp/WEB-INF/web.xml
@@ -8,7 +8,7 @@
AkkaServlet
- se.scalablesolutions.akka.AkkaServlet
+ se.scalablesolutions.akka.rest.AkkaServlet
diff --git a/akka-util/src/main/scala/Bootable.scala b/akka-util/src/main/scala/Bootable.scala
new file mode 100644
index 0000000000..17de26f2fd
--- /dev/null
+++ b/akka-util/src/main/scala/Bootable.scala
@@ -0,0 +1,10 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.util
+
+trait Bootable {
+ def onLoad : Unit = ()
+ def onUnload : Unit = ()
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a06944ec70..5e50980fe1 100755
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
akka-persistence-cassandra
akka-persistence-mongo
akka-rest
+ akka-comet
akka-amqp
akka-security
akka-kernel