From b2dc4680951e7b198e3e918aa85b30d67e6a6ace Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Dec 2009 19:05:16 +0100 Subject: [PATCH 01/11] Christmas cleaning --- akka-comet/pom.xml | 89 +++++++++++++++ .../src/main/scala/AkkaCometServlet.scala | 44 ++------ .../scala/BootableCometActorService.scala | 68 ++++++++++++ .../actor/BootableActorLoaderService.scala | 50 +++++++++ .../remote/BootableRemoteActorService.scala | 35 ++++++ .../scalablesolutions/akka/api/RestTest.java | 2 +- akka-kernel/pom.xml | 5 + akka-kernel/src/main/scala/Kernel.scala | 101 ++---------------- akka-rest/pom.xml | 13 ++- akka-rest/src/main/scala/AkkaServlet.scala | 34 ++++++ .../src/main/webapp/WEB-INF/web.xml | 2 +- .../src/main/webapp/WEB-INF/web.xml | 2 +- akka-util/src/main/scala/Bootable.scala | 10 ++ pom.xml | 1 + 14 files changed, 321 insertions(+), 135 deletions(-) create mode 100644 akka-comet/pom.xml rename akka-kernel/src/main/scala/AkkaServlet.scala => akka-comet/src/main/scala/AkkaCometServlet.scala (66%) create mode 100644 akka-comet/src/main/scala/BootableCometActorService.scala create mode 100644 akka-core/src/main/scala/actor/BootableActorLoaderService.scala create mode 100644 akka-core/src/main/scala/remote/BootableRemoteActorService.scala create mode 100755 akka-rest/src/main/scala/AkkaServlet.scala create mode 100644 akka-util/src/main/scala/Bootable.scala diff --git a/akka-comet/pom.xml b/akka-comet/pom.xml new file mode 100644 index 0000000000..778940d242 --- /dev/null +++ b/akka-comet/pom.xml @@ -0,0 +1,89 @@ + + 4.0.0 + + akka-comet + Akka Comet Module + + jar + + + akka + se.scalablesolutions.akka + 0.6 + ../pom.xml + + + + + + akka-core + ${project.groupId} + ${project.version} + + + akka-rest + ${project.groupId} + ${project.version} + + + + + com.sun.grizzly + grizzly-comet-webserver + ${grizzly.version} + + + + + javax.servlet + servlet-api + 2.5 + + + org.atmosphere + atmosphere-annotations + ${atmosphere.version} + + + org.atmosphere + atmosphere-jersey + ${atmosphere.version} + + + org.atmosphere + atmosphere-runtime + ${atmosphere.version} + + + + + diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-comet/src/main/scala/AkkaCometServlet.scala similarity index 66% rename from akka-kernel/src/main/scala/AkkaServlet.scala rename to akka-comet/src/main/scala/AkkaCometServlet.scala index 8c9fdba4c2..b56ff43fa3 100755 --- a/akka-kernel/src/main/scala/AkkaServlet.scala +++ b/akka-comet/src/main/scala/AkkaCometServlet.scala @@ -2,48 +2,20 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka +package se.scalablesolutions.akka.comet -import se.scalablesolutions.akka.config.ConfiguratorRepository -import se.scalablesolutions.akka.rest.ActorComponentProviderFactory +import se.scalablesolutions.akka.rest.AkkaServlet import se.scalablesolutions.akka.util.Logging -import com.sun.jersey.api.core.ResourceConfig -import com.sun.jersey.spi.container.servlet.ServletContainer -import com.sun.jersey.spi.container.WebApplication - import java.util.{List => JList} import javax.servlet.{ServletConfig} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} - -import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} import org.atmosphere.container.{GrizzlyCometSupport} +import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} import org.atmosphere.jersey.JerseyBroadcaster -/** - * Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container, - * e.g. not using the Akka Kernel. - * - * @author Jonas Bonér - */ -class AkkaServlet extends ServletContainer { - import org.scala_tools.javautils.Imports._ - - override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = { - Kernel.boot // will boot if not already booted by 'main' - - val configurators = ConfiguratorRepository.getConfigurators - - resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava) - resourceConfig.getProperties.put( - "com.sun.jersey.spi.container.ResourceFilters", - Config.config.getList("akka.rest.filters").mkString(",")) - - webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators)) - } -} /** * Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a @@ -79,19 +51,19 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster)) } - override def createCometSupportResolver() : CometSupportResolver = { + override def createCometSupportResolver() : CometSupportResolver = { import org.scala_tools.javautils.Imports._ - + new DefaultCometSupportResolver(config) { type CS = CometSupport[_ <: AtmosphereResource[_,_]] override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = { available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match { - case Nil => new GrizzlyCometSupport(config) + case Nil => new GrizzlyCometSupport(config) case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]]) - case _ => super.resolveMultipleNativeSupportConflict(available) + case _ => super.resolveMultipleNativeSupportConflict(available) } } - + override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = { val predef = config.getInitParameter("cometSupport") if(testClassExists(predef)) diff --git a/akka-comet/src/main/scala/BootableCometActorService.scala b/akka-comet/src/main/scala/BootableCometActorService.scala new file mode 100644 index 0000000000..84bb52a14e --- /dev/null +++ b/akka-comet/src/main/scala/BootableCometActorService.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.comet + +import com.sun.grizzly.http.SelectorThread +import com.sun.grizzly.http.servlet.ServletAdapter +import com.sun.grizzly.standalone.StaticStreamAlgorithm + +import javax.ws.rs.core.UriBuilder +import se.scalablesolutions.akka.actor.BootableActorLoaderService +import se.scalablesolutions.akka.util.{Bootable,Logging} + + +trait BootableCometActorService extends Bootable with Logging { + self : BootableActorLoaderService => + + import Config._ + + val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost") + val REST_URL = "http://" + REST_HOSTNAME + val REST_PORT = config.getInt("akka.rest.port", 9998) + protected var jerseySelectorThread: Option[SelectorThread] = None + + abstract override def onLoad = { + super.onLoad + if(config.getBool("akka.rest.service", true)){ + + val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build() + + val scheme = uri.getScheme + if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException( + "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'") + + val adapter = new ServletAdapter + adapter.setHandleStaticResources(true) + adapter.setServletInstance(new AkkaCometServlet) + adapter.setContextPath(uri.getPath) + //Using autodetection for now + adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") + if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root") + log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath) + + val ah = new com.sun.grizzly.arp.DefaultAsyncHandler + ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter) + jerseySelectorThread = Some(new SelectorThread).map { t => + t.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName) + t.setPort(REST_PORT) + t.setAdapter(adapter) + t.setEnableAsyncExecution(true) + t.setAsyncHandler(ah) + t.listen + t } + + log.info("REST service started successfully. Listening to port [%s]", REST_PORT) + } + } + + abstract override def onUnload = { + super.onUnload + + if (jerseySelectorThread.isDefined) { + log.info("Shutting down REST service (Jersey)") + jerseySelectorThread.get.stopEndpoint + } + } +} \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala new file mode 100644 index 0000000000..c4735a6e9c --- /dev/null +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.actor + +import java.io.File +import java.net.URLClassLoader +import se.scalablesolutions.akka.util.{Bootable,Logging} + +trait BootableActorLoaderService extends Bootable with Logging { + import Config._ + + val BOOT_CLASSES = config.getList("akka.boot") + var applicationLoader: Option[ClassLoader] = None + + protected def runApplicationBootClasses : Option[ClassLoader] = { + val loader = + if (HOME.isDefined) { + val CONFIG = HOME.get + "/config" + val DEPLOY = HOME.get + "/deploy" + val DEPLOY_DIR = new File(DEPLOY) + if (!DEPLOY_DIR.exists) { + log.error("Could not find a deploy directory at [%s]", DEPLOY) + System.exit(-1) + } + val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL + log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) + new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) + } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) { + getClass.getClassLoader + } else throw new IllegalStateException( + "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") + for (clazz <- BOOT_CLASSES) { + log.info("Loading boot class [%s]", clazz) + loader.loadClass(clazz).newInstance + } + + Some(loader) + } + + abstract override def onLoad = { + applicationLoader = runApplicationBootClasses + super.onLoad + } + + abstract override def onUnload = { + ActorRegistry.shutdownAll + } +} \ No newline at end of file diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala new file mode 100644 index 0000000000..bdee07944b --- /dev/null +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.remote + +import se.scalablesolutions.akka.actor.BootableActorLoaderService +import se.scalablesolutions.akka.util.{Bootable,Logging} + +trait BootableRemoteActorService extends Bootable with Logging { + self : BootableActorLoaderService => + + import Config._ + + protected lazy val remoteServerThread = new Thread(new Runnable() { + def run = RemoteNode.start(self.applicationLoader) + }, "Akka Remote Service") + + def startRemoteService = remoteServerThread.start + + abstract override def onLoad = { + super.onLoad + if(config.getBool("akka.remote.server.service", true)) startRemoteService + } + + abstract override def onUnload = { + super.onUnload + + if (remoteServerThread.isAlive) { + log.info("Shutting down remote service") + RemoteNode.shutdown + remoteServerThread.join(1000) + } + } +} \ No newline at end of file diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java index 4eb17ce7e6..bae492d0e1 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java @@ -60,7 +60,7 @@ public class RestTest extends TestCase { */ private static SelectorThread startJersey() { try { - Servlet servlet = new se.scalablesolutions.akka.AkkaServlet(); + Servlet servlet = new se.scalablesolutions.akka.rest.AkkaServlet(); ServletAdapter adapter = new ServletAdapter(); adapter.setServletInstance(servlet); adapter.setContextPath(URI.getPath()); diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index c7d0c3134e..5eceb218f5 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -47,6 +47,11 @@ ${project.groupId} ${project.version} + + akka-comet + ${project.groupId} + ${project.version} + diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index 8d684c4619..7cc7a9ce35 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -4,17 +4,10 @@ package se.scalablesolutions.akka -import com.sun.grizzly.http.SelectorThread -import com.sun.grizzly.http.servlet.ServletAdapter -import com.sun.grizzly.standalone.StaticStreamAlgorithm - -import javax.ws.rs.core.UriBuilder -import java.io.File -import java.net.URLClassLoader - -import se.scalablesolutions.akka.remote.RemoteNode +import se.scalablesolutions.akka.comet.BootableCometActorService +import se.scalablesolutions.akka.remote.{RemoteNode,BootableRemoteActorService} import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.actor.ActorRegistry +import se.scalablesolutions.akka.actor.{ActorRegistry,BootableActorLoaderService} /** * The Akka Kernel. @@ -24,22 +17,12 @@ import se.scalablesolutions.akka.actor.ActorRegistry object Kernel extends Logging { import Config._ - val BOOT_CLASSES = config.getList("akka.boot") - val RUN_REMOTE_SERVICE = config.getBool("akka.remote.server.service", true) - val RUN_REST_SERVICE = config.getBool("akka.rest.service", true) - val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost") - val REST_URL = "http://" + REST_HOSTNAME - val REST_PORT = config.getInt("akka.rest.port", 9998) - // FIXME add API to shut server down gracefully @volatile private var hasBooted = false - private var jerseySelectorThread: Option[SelectorThread] = None + private val startTime = System.currentTimeMillis - private var applicationLoader: Option[ClassLoader] = None - - private lazy val remoteServerThread = new Thread(new Runnable() { - def run = RemoteNode.start(applicationLoader) - }, "Akka Remote Service") + + object Bundles extends BootableActorLoaderService with BootableRemoteActorService with BootableCometActorService def main(args: Array[String]) = boot @@ -56,11 +39,7 @@ object Kernel extends Logging { if (!hasBooted) { if (withBanner) printBanner log.info("Starting Akka...") - - runApplicationBootClasses - if (RUN_REMOTE_SERVICE) startRemoteService - if (RUN_REST_SERVICE) startREST - + Bundles.onLoad Thread.currentThread.setContextClassLoader(getClass.getClassLoader) log.info("Akka started successfully") hasBooted = true @@ -71,74 +50,12 @@ object Kernel extends Logging { def shutdown = synchronized { if (hasBooted) { log.info("Shutting down Akka...") - ActorRegistry.shutdownAll - if (jerseySelectorThread.isDefined) { - log.info("Shutting down REST service (Jersey)") - jerseySelectorThread.get.stopEndpoint - } - if (remoteServerThread.isAlive) { - log.info("Shutting down remote service") - RemoteNode.shutdown - remoteServerThread.join(1000) - } + Bundles.onUnload log.info("Akka succesfully shut down") } } - def startRemoteService = remoteServerThread.start - - def startREST = { - val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build() - - val scheme = uri.getScheme - if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException( - "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'") - - val adapter = new ServletAdapter - adapter.setHandleStaticResources(true) - adapter.setServletInstance(new AkkaCometServlet) - adapter.setContextPath(uri.getPath) - //Using autodetection for now - adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") - if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root") - log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath) - - val ah = new com.sun.grizzly.arp.DefaultAsyncHandler - ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter) - jerseySelectorThread = Some(new SelectorThread) - jerseySelectorThread.get.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName) - jerseySelectorThread.get.setPort(REST_PORT) - jerseySelectorThread.get.setAdapter(adapter) - jerseySelectorThread.get.setEnableAsyncExecution(true) - jerseySelectorThread.get.setAsyncHandler(ah) - jerseySelectorThread.get.listen - - log.info("REST service started successfully. Listening to port [%s]", REST_PORT) - } - - private def runApplicationBootClasses = { - val loader = - if (HOME.isDefined) { - val CONFIG = HOME.get + "/config" - val DEPLOY = HOME.get + "/deploy" - val DEPLOY_DIR = new File(DEPLOY) - if (!DEPLOY_DIR.exists) { - log.error("Could not find a deploy directory at [%s]", DEPLOY) - System.exit(-1) - } - val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL - log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) - new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) - } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) { - getClass.getClassLoader - } else throw new IllegalStateException( - "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") - for (clazz <- BOOT_CLASSES) { - log.info("Loading boot class [%s]", clazz) - loader.loadClass(clazz).newInstance - } - applicationLoader = Some(loader) - } + def startRemoteService = Bundles.startRemoteService private def printBanner = { log.info( diff --git a/akka-rest/pom.xml b/akka-rest/pom.xml index 32fbad0997..c354831cd4 100644 --- a/akka-rest/pom.xml +++ b/akka-rest/pom.xml @@ -23,11 +23,16 @@ ${project.version} - + - com.sun.grizzly - grizzly-comet-webserver - ${grizzly.version} + javax.servlet + servlet-api + 2.5 + + + com.sun.jersey + jersey-core + ${jersey.version} com.sun.jersey diff --git a/akka-rest/src/main/scala/AkkaServlet.scala b/akka-rest/src/main/scala/AkkaServlet.scala new file mode 100755 index 0000000000..4661ab5c14 --- /dev/null +++ b/akka-rest/src/main/scala/AkkaServlet.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.rest + +import se.scalablesolutions.akka.config.ConfiguratorRepository + +import com.sun.jersey.api.core.ResourceConfig +import com.sun.jersey.spi.container.servlet.ServletContainer +import com.sun.jersey.spi.container.WebApplication + +/** + * Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container, + * e.g. not using the Akka Kernel. + * + * @author Jonas Bonér + */ +class AkkaServlet extends ServletContainer { + import org.scala_tools.javautils.Imports._ + + override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = { + //Kernel.boot // will boot if not already booted by 'main' + + val configurators = ConfiguratorRepository.getConfigurators + + resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava) + resourceConfig.getProperties.put( + "com.sun.jersey.spi.container.ResourceFilters", + Config.config.getList("akka.rest.filters").mkString(",")) + + webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators)) + } +} \ No newline at end of file diff --git a/akka-samples-lift/src/main/webapp/WEB-INF/web.xml b/akka-samples-lift/src/main/webapp/WEB-INF/web.xml index c95dc59b2c..23348604bb 100755 --- a/akka-samples-lift/src/main/webapp/WEB-INF/web.xml +++ b/akka-samples-lift/src/main/webapp/WEB-INF/web.xml @@ -13,7 +13,7 @@ AkkaServlet - se.scalablesolutions.akka.AkkaServlet + se.scalablesolutions.akka.rest.AkkaServlet AkkaServlet diff --git a/akka-samples-security/src/main/webapp/WEB-INF/web.xml b/akka-samples-security/src/main/webapp/WEB-INF/web.xml index d0dfea26a8..02a6f7d918 100644 --- a/akka-samples-security/src/main/webapp/WEB-INF/web.xml +++ b/akka-samples-security/src/main/webapp/WEB-INF/web.xml @@ -8,7 +8,7 @@ AkkaServlet - se.scalablesolutions.akka.AkkaServlet + se.scalablesolutions.akka.rest.AkkaServlet diff --git a/akka-util/src/main/scala/Bootable.scala b/akka-util/src/main/scala/Bootable.scala new file mode 100644 index 0000000000..17de26f2fd --- /dev/null +++ b/akka-util/src/main/scala/Bootable.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.util + +trait Bootable { + def onLoad : Unit = () + def onUnload : Unit = () +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 07647bb6d1..14b5791d76 100755 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ akka-persistence-cassandra akka-persistence-mongo akka-rest + akka-comet akka-amqp akka-security akka-kernel From 41a90d041d8bac29e5418bbf032f6e820dc6f9b3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Dec 2009 20:06:04 +0100 Subject: [PATCH 02/11] Added possibility to use Kernel as j2ee context listener --- akka-kernel/src/main/scala/Kernel.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index 7cc7a9ce35..a312652cf8 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -9,6 +9,8 @@ import se.scalablesolutions.akka.remote.{RemoteNode,BootableRemoteActorService} import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.actor.{ActorRegistry,BootableActorLoaderService} +import javax.servlet.{ServletContextListener,ServletContextEvent} + /** * The Akka Kernel. * @@ -71,4 +73,13 @@ object Kernel extends Logging { log.info(" Running version %s", VERSION) log.info("==============================") } -} \ No newline at end of file +} + + /* + And this one can be added to web.xml mappings as a listener to boot and shutdown Akka + */ + +class Kernel extends ServletContextListener { + def contextDestroyed(e : ServletContextEvent) : Unit = Kernel.shutdown + def contextInitialized(e : ServletContextEvent) : Unit = Kernel.boot + } \ No newline at end of file From e163b4c68555cc793247e215a28c3c4d538508be Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Dec 2009 21:03:27 +0100 Subject: [PATCH 03/11] Added Kernel class for web kernel --- .../se/scalablesolutions/akka/api/PersistenceManager.java | 2 +- akka-kernel/src/main/scala/Kernel.scala | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java index 49cc8c0d0a..cd856b64df 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistenceManager.java @@ -4,7 +4,7 @@ public class PersistenceManager { private static volatile boolean isRunning = false; public static void init() { if (!isRunning) { - se.scalablesolutions.akka.Kernel.startRemoteService(); + se.scalablesolutions.akka.Kernel$.MODULE$.startRemoteService(); isRunning = true; } } diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index a312652cf8..cb1f84b4ca 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -17,8 +17,6 @@ import javax.servlet.{ServletContextListener,ServletContextEvent} * @author Jonas Bonér */ object Kernel extends Logging { - import Config._ - // FIXME add API to shut server down gracefully @volatile private var hasBooted = false @@ -70,7 +68,7 @@ object Kernel extends Logging { (____ /__|_ \__|_ \(____ / \/ \/ \/ \/ """) - log.info(" Running version %s", VERSION) + log.info(" Running version %s", Config.VERSION) log.info("==============================") } } From 194fc8606275c76db9c46c558504be310231f9fd Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 22 Dec 2009 21:29:15 +0100 Subject: [PATCH 04/11] Forgot to add the Main class --- akka-kernel/pom.xml | 2 +- akka-kernel/src/main/scala/Kernel.scala | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index 5eceb218f5..93acfec28d 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -93,7 +93,7 @@ - se.scalablesolutions.akka.Kernel + se.scalablesolutions.akka.Main diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index cb1f84b4ca..623ec86cc9 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -11,6 +11,10 @@ import se.scalablesolutions.akka.actor.{ActorRegistry,BootableActorLoaderService import javax.servlet.{ServletContextListener,ServletContextEvent} +object Main { + def main(args: Array[String]) = Kernel.boot +} + /** * The Akka Kernel. * @@ -24,8 +28,6 @@ object Kernel extends Logging { object Bundles extends BootableActorLoaderService with BootableRemoteActorService with BootableCometActorService - def main(args: Array[String]) = boot - /** * Boots up the Kernel. */ From 429ce066c40d36e4e7a28395ce678fadb4e96526 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Dec 2009 21:37:39 +0100 Subject: [PATCH 05/11] Experimenting with Comet cluster support --- .../scala/AkkaClusterBroadcastFilter.scala | 37 +++++++++++++++++++ .../scala/BootableCometActorService.scala | 2 + akka-kernel/pom.xml | 5 +++ .../src/main/scala/SimpleService.scala | 9 +++-- 4 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala new file mode 100644 index 0000000000..1c79f09477 --- /dev/null +++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.comet + +import se.scalablesolutions.akka.util.{Logging} +import se.scalablesolutions.akka.actor.{Actor} +import se.scalablesolutions.akka.remote.{Cluster} +import org.atmosphere.cpr.{ClusterBroadcastFilter,Broadcaster} +import scala.reflect.{BeanProperty} + +sealed trait AkkaClusterBroadcastMessage +case class BroadcastMessage(val name : String, val msg : AnyRef) extends AkkaClusterBroadcastMessage + +class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] with Logging { + @BeanProperty var clusterName = "" + @BeanProperty var broadcaster : Broadcaster = null + + override def init = () + + def destroy = () + + def filter(o : AnyRef) : AnyRef = { + Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o)) + log.info("filter invoked for message [%s], message was forwarded to cluster",o) + o + } + + def receive = { + case BroadcastMessage(clusterName,m) if broadcaster ne null => { + log.info("Receiving remote message, broadcasting it to listeners: [%s]",m) + broadcaster broadcast m + } + case x => log.info("Not a valid message for cluster[%s] = [%s]",clusterName,x) + } +} \ No newline at end of file diff --git a/akka-comet/src/main/scala/BootableCometActorService.scala b/akka-comet/src/main/scala/BootableCometActorService.scala index 84bb52a14e..90c8db9544 100644 --- a/akka-comet/src/main/scala/BootableCometActorService.scala +++ b/akka-comet/src/main/scala/BootableCometActorService.scala @@ -32,6 +32,8 @@ trait BootableCometActorService extends Bootable with Logging { val scheme = uri.getScheme if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException( "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'") + + log.info("Attempting to start REST service on uri [%s]",uri) val adapter = new ServletAdapter adapter.setHandleStaticResources(true) diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index 93acfec28d..c32504f5a1 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -54,6 +54,11 @@ + + com.sun.jersey + jersey-server + ${jersey.version} + org.atmosphere atmosphere-annotations diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala index 11e8304551..20b8d4a280 100644 --- a/akka-samples-scala/src/main/scala/SimpleService.scala +++ b/akka-samples-scala/src/main/scala/SimpleService.scala @@ -8,13 +8,14 @@ import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor} import se.scalablesolutions.akka.state.{CassandraStorage, TransactionalState} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.comet.{AkkaClusterBroadcastFilter} import java.lang.Integer import java.nio.ByteBuffer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} -import org.atmosphere.annotation.{Broadcast, Suspend} +import org.atmosphere.annotation.{Broadcast, Suspend,Cluster} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} import org.atmosphere.jersey.Broadcastable @@ -85,6 +86,7 @@ class PubSub extends Actor { @Broadcast @Path("/topic/{topic}/{message}/") @Produces(Array("text/plain;charset=ISO-8859-1")) + @Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "foo" } def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic) def receive = { case _ => } @@ -150,9 +152,10 @@ class Chat extends Transactor { case x => log.info("recieve unknown: " + x) } - @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) - @Consumes(Array("application/x-www-form-urlencoded")) @POST + @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) + @Cluster(Array(classOf[AkkaClusterBroadcastFilter])) { val name = "bar" } + @Consumes(Array("application/x-www-form-urlencoded")) @Produces(Array("text/html")) def publishMessage(form: MultivaluedMap[String, String]) = (this !! Chat(form.getFirst("name"), From bea3254ccc0d5db19b5c27fa5475c0bc75358696 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Dec 2009 23:22:09 +0100 Subject: [PATCH 06/11] Tweaking --- .../scala/AkkaClusterBroadcastFilter.scala | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala index 1c79f09477..8187e7ce81 100644 --- a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala +++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -17,21 +17,34 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe @BeanProperty var clusterName = "" @BeanProperty var broadcaster : Broadcaster = null - override def init = () + override def init : Unit = start - def destroy = () + def destroy : Unit = stop def filter(o : AnyRef) : AnyRef = { - Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o)) - log.info("filter invoked for message [%s], message was forwarded to cluster",o) - o + o match { + case bm@BroadcastMessage(_,m) => { + log.info("filter invoked for message [%s], message shouldn't be forwarded to cluster",o) + m + } + + case _ => { + Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o)) + log.info("filter invoked for message [%s], message was forwarded to cluster",o) + o + } + } } def receive = { - case BroadcastMessage(clusterName,m) if broadcaster ne null => { + case bm@BroadcastMessage(c,m) if (c == clusterName) && (broadcaster ne null) => { log.info("Receiving remote message, broadcasting it to listeners: [%s]",m) - broadcaster broadcast m + broadcaster broadcast bm } case x => log.info("Not a valid message for cluster[%s] = [%s]",clusterName,x) } -} \ No newline at end of file +} + +/*class AkkaBroadcaster extends JerseyBroadcaster { + super.bc.addFilter() +}*/ \ No newline at end of file From 5ed2c718c5628442fa235175deb17a4a28900653 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 24 Dec 2009 00:44:36 +0100 Subject: [PATCH 07/11] Got it working! --- akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala index 8187e7ce81..aae15c3cb2 100644 --- a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala +++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -17,7 +17,7 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe @BeanProperty var clusterName = "" @BeanProperty var broadcaster : Broadcaster = null - override def init : Unit = start + override def init : Unit = () def destroy : Unit = stop @@ -43,6 +43,8 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe } case x => log.info("Not a valid message for cluster[%s] = [%s]",clusterName,x) } + + start //Make sure it gets started when instantiated } /*class AkkaBroadcaster extends JerseyBroadcaster { From 6169955a1d146c7e1c6fa3c6da9a08f1e869c79b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 24 Dec 2009 14:20:48 +0100 Subject: [PATCH 08/11] Cleaned up the code --- .../scala/AkkaClusterBroadcastFilter.scala | 63 ++++++++----------- 1 file changed, 26 insertions(+), 37 deletions(-) diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala index aae15c3cb2..43b87c27f8 100644 --- a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala +++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -4,7 +4,6 @@ package se.scalablesolutions.akka.comet -import se.scalablesolutions.akka.util.{Logging} import se.scalablesolutions.akka.actor.{Actor} import se.scalablesolutions.akka.remote.{Cluster} import org.atmosphere.cpr.{ClusterBroadcastFilter,Broadcaster} @@ -13,40 +12,30 @@ import scala.reflect.{BeanProperty} sealed trait AkkaClusterBroadcastMessage case class BroadcastMessage(val name : String, val msg : AnyRef) extends AkkaClusterBroadcastMessage -class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] with Logging { - @BeanProperty var clusterName = "" - @BeanProperty var broadcaster : Broadcaster = null - - override def init : Unit = () - - def destroy : Unit = stop - - def filter(o : AnyRef) : AnyRef = { - o match { - case bm@BroadcastMessage(_,m) => { - log.info("filter invoked for message [%s], message shouldn't be forwarded to cluster",o) - m - } - - case _ => { - Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o)) - log.info("filter invoked for message [%s], message was forwarded to cluster",o) - o - } - } - } - - def receive = { - case bm@BroadcastMessage(c,m) if (c == clusterName) && (broadcaster ne null) => { - log.info("Receiving remote message, broadcasting it to listeners: [%s]",m) - broadcaster broadcast bm - } - case x => log.info("Not a valid message for cluster[%s] = [%s]",clusterName,x) - } - - start //Make sure it gets started when instantiated -} +class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] { + @BeanProperty var clusterName = "" + @BeanProperty var broadcaster : Broadcaster = null -/*class AkkaBroadcaster extends JerseyBroadcaster { - super.bc.addFilter() -}*/ \ No newline at end of file + override def init : Unit = () + + def destroy : Unit = stop + + def filter(o : AnyRef) : AnyRef = { + o match { + case BroadcastMessage(_,m) => m + + case _ => { + Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o)) + o + } + } + } + + def receive = { + case bm@BroadcastMessage(c,m) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast bm + case _ => + } + + //Since this class is instantiated by Atmosphere, we need to make sure it's started + start +} \ No newline at end of file From 1d62c87867e8ad45d4429ca40b48517f195f64ed Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 24 Dec 2009 14:30:49 +0100 Subject: [PATCH 09/11] Additional tidying --- .../src/main/scala/AkkaClusterBroadcastFilter.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala index 43b87c27f8..014bb8d64d 100644 --- a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala +++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -9,8 +9,8 @@ import se.scalablesolutions.akka.remote.{Cluster} import org.atmosphere.cpr.{ClusterBroadcastFilter,Broadcaster} import scala.reflect.{BeanProperty} -sealed trait AkkaClusterBroadcastMessage -case class BroadcastMessage(val name : String, val msg : AnyRef) extends AkkaClusterBroadcastMessage +sealed trait AkkaClusterCometBroadcastMessage +case class BroadcastMessage(val name : String, val msg : AnyRef) extends AkkaClusterCometBroadcastMessage class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] { @BeanProperty var clusterName = "" @@ -27,12 +27,12 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe case _ => { Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o)) o - } } - } + } + } def receive = { - case bm@BroadcastMessage(c,m) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast bm + case bm@BroadcastMessage(c,_) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast bm case _ => } From 2e7b74986ba7f6c4821c8a4852c823a269d230e4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 24 Dec 2009 17:11:57 +0100 Subject: [PATCH 10/11] Some renaming and some comments --- .../scala/AkkaClusterBroadcastFilter.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala index 014bb8d64d..b517dc0194 100644 --- a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala +++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -9,8 +9,8 @@ import se.scalablesolutions.akka.remote.{Cluster} import org.atmosphere.cpr.{ClusterBroadcastFilter,Broadcaster} import scala.reflect.{BeanProperty} -sealed trait AkkaClusterCometBroadcastMessage -case class BroadcastMessage(val name : String, val msg : AnyRef) extends AkkaClusterCometBroadcastMessage +sealed trait ClusterCometMessageType +case class ClusterCometBroadcast(val name : String, val msg : AnyRef) extends ClusterCometMessageType class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] { @BeanProperty var clusterName = "" @@ -20,19 +20,18 @@ class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRe def destroy : Unit = stop - def filter(o : AnyRef) : AnyRef = { - o match { - case BroadcastMessage(_,m) => m - - case _ => { - Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],BroadcastMessage(clusterName,o)) - o - } + def filter(o : AnyRef) : AnyRef = o match { + case ClusterCometBroadcast(_,m) => m //Do not re-broadcast, just unbox and pass along + + case m : AnyRef => { //Relay message to the cluster and pass along + Cluster.relayMessage(classOf[AkkaClusterBroadcastFilter],ClusterCometBroadcast(clusterName,m)) + m } } def receive = { - case bm@BroadcastMessage(c,_) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast bm + //Only handle messages intended for this particular instance + case b@ClusterCometBroadcast(c,_) if (c == clusterName) && (broadcaster ne null) => broadcaster broadcast b case _ => } From ecc64064e2117d88f22a6e58467ffa433caf1559 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 26 Dec 2009 15:09:44 +0100 Subject: [PATCH 11/11] Adding docs --- .../main/scala/AkkaClusterBroadcastFilter.scala | 8 ++++++++ akka-comet/src/main/scala/AkkaCometServlet.scala | 8 ++++++++ .../main/scala/BootableCometActorService.scala | 6 ++++-- .../scala/actor/BootableActorLoaderService.scala | 3 +++ .../scala/remote/BootableRemoteActorService.scala | 15 ++++++++++++--- akka-core/src/main/scala/remote/Cluster.scala | 2 +- akka-kernel/src/main/scala/Kernel.scala | 6 +++++- 7 files changed, 41 insertions(+), 7 deletions(-) diff --git a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala index b517dc0194..5c98501978 100644 --- a/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala +++ b/akka-comet/src/main/scala/AkkaClusterBroadcastFilter.scala @@ -12,6 +12,14 @@ import scala.reflect.{BeanProperty} sealed trait ClusterCometMessageType case class ClusterCometBroadcast(val name : String, val msg : AnyRef) extends ClusterCometMessageType +/** + * Enables explicit clustering of Atmosphere (Comet) resources + * Annotate the endpoint which has the @Broadcast annotation with + * @org.atmosphere.annotation.Cluster(Array(classOf[AkkClusterBroadcastFilter])){ name = "someUniqueName" } + * that's all folks! + * Note: In the future, clustering comet will be transparent + */ + class AkkaClusterBroadcastFilter extends Actor with ClusterBroadcastFilter[AnyRef] { @BeanProperty var clusterName = "" @BeanProperty var broadcaster : Broadcaster = null diff --git a/akka-comet/src/main/scala/AkkaCometServlet.scala b/akka-comet/src/main/scala/AkkaCometServlet.scala index b56ff43fa3..9b8fa06306 100755 --- a/akka-comet/src/main/scala/AkkaCometServlet.scala +++ b/akka-comet/src/main/scala/AkkaCometServlet.scala @@ -46,11 +46,19 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging } } + /** + * We override this to avoid Atmosphere looking for it's atmosphere.xml file + * Instead we specify what semantics we want in code. + */ override def loadConfiguration(sc: ServletConfig) { config = new AtmosphereConfig { supportSession = false } atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster)) } + /** + * This method is overridden because Akka Kernel is bundles with Grizzly, so if we deploy the Kernel in another container, + * we need to handle that. + */ override def createCometSupportResolver() : CometSupportResolver = { import org.scala_tools.javautils.Imports._ diff --git a/akka-comet/src/main/scala/BootableCometActorService.scala b/akka-comet/src/main/scala/BootableCometActorService.scala index 90c8db9544..67d85ea1cc 100644 --- a/akka-comet/src/main/scala/BootableCometActorService.scala +++ b/akka-comet/src/main/scala/BootableCometActorService.scala @@ -12,7 +12,9 @@ import javax.ws.rs.core.UriBuilder import se.scalablesolutions.akka.actor.BootableActorLoaderService import se.scalablesolutions.akka.util.{Bootable,Logging} - +/** + * Handles the Akka Comet Support (load/unload) + */ trait BootableCometActorService extends Bootable with Logging { self : BootableActorLoaderService => @@ -40,7 +42,7 @@ trait BootableCometActorService extends Bootable with Logging { adapter.setServletInstance(new AkkaCometServlet) adapter.setContextPath(uri.getPath) //Using autodetection for now - adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") + //adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root") log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath) diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index c4735a6e9c..ef6d8f7eab 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -8,6 +8,9 @@ import java.io.File import java.net.URLClassLoader import se.scalablesolutions.akka.util.{Bootable,Logging} +/** + * Handles all modules in the deploy directory (load and unload) + */ trait BootableActorLoaderService extends Bootable with Logging { import Config._ diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index bdee07944b..38ed31651e 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -7,6 +7,11 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.actor.BootableActorLoaderService import se.scalablesolutions.akka.util.{Bootable,Logging} +/** + * This bundle/service is responsible for booting up and shutting down the remote actors facility + * It's used in Kernel + */ + trait BootableRemoteActorService extends Bootable with Logging { self : BootableActorLoaderService => @@ -19,15 +24,19 @@ trait BootableRemoteActorService extends Bootable with Logging { def startRemoteService = remoteServerThread.start abstract override def onLoad = { - super.onLoad - if(config.getBool("akka.remote.server.service", true)) startRemoteService + super.onLoad //Make sure the actors facility is loaded before we load the remote service + if(config.getBool("akka.remote.server.service", true)){ + log.info("Initializing Remote Actors Service...") + startRemoteService + log.info("Remote Actors Service initialized!") + } } abstract override def onUnload = { super.onUnload if (remoteServerThread.isAlive) { - log.info("Shutting down remote service") + log.info("Shutting down Remote Actors Service") RemoteNode.shutdown remoteServerThread.join(1000) } diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 03708069f4..9e06d6d267 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -151,7 +151,7 @@ class JGroupsClusterActor extends ClusterActor { def receive = { case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead - log debug ("Zombie: %s", x) + log debug ("Killing Zombie Node: %s", x) broadcast(x :: Nil, PapersPlease) remotes = remotes - x } diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index 623ec86cc9..86f6226c5a 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -26,6 +26,7 @@ object Kernel extends Logging { private val startTime = System.currentTimeMillis + //Bundles is what modules are to be loaded with the Kernel, this uses Jonas' AOP style mixin pattern object Bundles extends BootableActorLoaderService with BootableRemoteActorService with BootableCometActorService /** @@ -48,7 +49,9 @@ object Kernel extends Logging { } } - // TODO document Kernel.shutdown + /** + * Shuts down the kernel, unloads all of the bundles + */ def shutdown = synchronized { if (hasBooted) { log.info("Shutting down Akka...") @@ -57,6 +60,7 @@ object Kernel extends Logging { } } + //FIXME This is only being called by a test def startRemoteService = Bundles.startRemoteService private def printBanner = {