diff --git a/akka-comet/pom.xml b/akka-comet/pom.xml new file mode 100644 index 0000000000..778940d242 --- /dev/null +++ b/akka-comet/pom.xml @@ -0,0 +1,89 @@ + + 4.0.0 + + akka-comet + Akka Comet Module + + jar + + + akka + se.scalablesolutions.akka + 0.6 + ../pom.xml + + + + + + akka-core + ${project.groupId} + ${project.version} + + + akka-rest + ${project.groupId} + ${project.version} + + + + + com.sun.grizzly + grizzly-comet-webserver + ${grizzly.version} + + + + + javax.servlet + servlet-api + 2.5 + + + org.atmosphere + atmosphere-annotations + ${atmosphere.version} + + + org.atmosphere + atmosphere-jersey + ${atmosphere.version} + + + org.atmosphere + atmosphere-runtime + ${atmosphere.version} + + + + + diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-comet/src/main/scala/AkkaCometServlet.scala similarity index 66% rename from akka-kernel/src/main/scala/AkkaServlet.scala rename to akka-comet/src/main/scala/AkkaCometServlet.scala index 8c9fdba4c2..b56ff43fa3 100755 --- a/akka-kernel/src/main/scala/AkkaServlet.scala +++ b/akka-comet/src/main/scala/AkkaCometServlet.scala @@ -2,48 +2,20 @@ * Copyright (C) 2009 Scalable Solutions. */ -package se.scalablesolutions.akka +package se.scalablesolutions.akka.comet -import se.scalablesolutions.akka.config.ConfiguratorRepository -import se.scalablesolutions.akka.rest.ActorComponentProviderFactory +import se.scalablesolutions.akka.rest.AkkaServlet import se.scalablesolutions.akka.util.Logging -import com.sun.jersey.api.core.ResourceConfig -import com.sun.jersey.spi.container.servlet.ServletContainer -import com.sun.jersey.spi.container.WebApplication - import java.util.{List => JList} import javax.servlet.{ServletConfig} import javax.servlet.http.{HttpServletRequest, HttpServletResponse} - -import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} import org.atmosphere.container.{GrizzlyCometSupport} +import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} import org.atmosphere.jersey.JerseyBroadcaster -/** - * Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container, - * e.g. not using the Akka Kernel. - * - * @author Jonas Bonér - */ -class AkkaServlet extends ServletContainer { - import org.scala_tools.javautils.Imports._ - - override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = { - Kernel.boot // will boot if not already booted by 'main' - - val configurators = ConfiguratorRepository.getConfigurators - - resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava) - resourceConfig.getProperties.put( - "com.sun.jersey.spi.container.ResourceFilters", - Config.config.getList("akka.rest.filters").mkString(",")) - - webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators)) - } -} /** * Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a @@ -79,19 +51,19 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging atmosphereHandlers.put("/*", new AtmosphereServlet.AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster)) } - override def createCometSupportResolver() : CometSupportResolver = { + override def createCometSupportResolver() : CometSupportResolver = { import org.scala_tools.javautils.Imports._ - + new DefaultCometSupportResolver(config) { type CS = CometSupport[_ <: AtmosphereResource[_,_]] override def resolveMultipleNativeSupportConflict(available : JList[Class[_ <: CS]]) : CS = { available.asScala.filter(_ != classOf[GrizzlyCometSupport]).toList match { - case Nil => new GrizzlyCometSupport(config) + case Nil => new GrizzlyCometSupport(config) case x :: Nil => newCometSupport(x.asInstanceOf[Class[_ <: CS]]) - case _ => super.resolveMultipleNativeSupportConflict(available) + case _ => super.resolveMultipleNativeSupportConflict(available) } } - + override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CS = { val predef = config.getInitParameter("cometSupport") if(testClassExists(predef)) diff --git a/akka-comet/src/main/scala/BootableCometActorService.scala b/akka-comet/src/main/scala/BootableCometActorService.scala new file mode 100644 index 0000000000..84bb52a14e --- /dev/null +++ b/akka-comet/src/main/scala/BootableCometActorService.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.comet + +import com.sun.grizzly.http.SelectorThread +import com.sun.grizzly.http.servlet.ServletAdapter +import com.sun.grizzly.standalone.StaticStreamAlgorithm + +import javax.ws.rs.core.UriBuilder +import se.scalablesolutions.akka.actor.BootableActorLoaderService +import se.scalablesolutions.akka.util.{Bootable,Logging} + + +trait BootableCometActorService extends Bootable with Logging { + self : BootableActorLoaderService => + + import Config._ + + val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost") + val REST_URL = "http://" + REST_HOSTNAME + val REST_PORT = config.getInt("akka.rest.port", 9998) + protected var jerseySelectorThread: Option[SelectorThread] = None + + abstract override def onLoad = { + super.onLoad + if(config.getBool("akka.rest.service", true)){ + + val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build() + + val scheme = uri.getScheme + if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException( + "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'") + + val adapter = new ServletAdapter + adapter.setHandleStaticResources(true) + adapter.setServletInstance(new AkkaCometServlet) + adapter.setContextPath(uri.getPath) + //Using autodetection for now + adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") + if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root") + log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath) + + val ah = new com.sun.grizzly.arp.DefaultAsyncHandler + ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter) + jerseySelectorThread = Some(new SelectorThread).map { t => + t.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName) + t.setPort(REST_PORT) + t.setAdapter(adapter) + t.setEnableAsyncExecution(true) + t.setAsyncHandler(ah) + t.listen + t } + + log.info("REST service started successfully. Listening to port [%s]", REST_PORT) + } + } + + abstract override def onUnload = { + super.onUnload + + if (jerseySelectorThread.isDefined) { + log.info("Shutting down REST service (Jersey)") + jerseySelectorThread.get.stopEndpoint + } + } +} \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala new file mode 100644 index 0000000000..c4735a6e9c --- /dev/null +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.actor + +import java.io.File +import java.net.URLClassLoader +import se.scalablesolutions.akka.util.{Bootable,Logging} + +trait BootableActorLoaderService extends Bootable with Logging { + import Config._ + + val BOOT_CLASSES = config.getList("akka.boot") + var applicationLoader: Option[ClassLoader] = None + + protected def runApplicationBootClasses : Option[ClassLoader] = { + val loader = + if (HOME.isDefined) { + val CONFIG = HOME.get + "/config" + val DEPLOY = HOME.get + "/deploy" + val DEPLOY_DIR = new File(DEPLOY) + if (!DEPLOY_DIR.exists) { + log.error("Could not find a deploy directory at [%s]", DEPLOY) + System.exit(-1) + } + val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL + log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) + new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) + } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) { + getClass.getClassLoader + } else throw new IllegalStateException( + "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") + for (clazz <- BOOT_CLASSES) { + log.info("Loading boot class [%s]", clazz) + loader.loadClass(clazz).newInstance + } + + Some(loader) + } + + abstract override def onLoad = { + applicationLoader = runApplicationBootClasses + super.onLoad + } + + abstract override def onUnload = { + ActorRegistry.shutdownAll + } +} \ No newline at end of file diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala new file mode 100644 index 0000000000..bdee07944b --- /dev/null +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.remote + +import se.scalablesolutions.akka.actor.BootableActorLoaderService +import se.scalablesolutions.akka.util.{Bootable,Logging} + +trait BootableRemoteActorService extends Bootable with Logging { + self : BootableActorLoaderService => + + import Config._ + + protected lazy val remoteServerThread = new Thread(new Runnable() { + def run = RemoteNode.start(self.applicationLoader) + }, "Akka Remote Service") + + def startRemoteService = remoteServerThread.start + + abstract override def onLoad = { + super.onLoad + if(config.getBool("akka.remote.server.service", true)) startRemoteService + } + + abstract override def onUnload = { + super.onUnload + + if (remoteServerThread.isAlive) { + log.info("Shutting down remote service") + RemoteNode.shutdown + remoteServerThread.join(1000) + } + } +} \ No newline at end of file diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java index 4eb17ce7e6..bae492d0e1 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java @@ -60,7 +60,7 @@ public class RestTest extends TestCase { */ private static SelectorThread startJersey() { try { - Servlet servlet = new se.scalablesolutions.akka.AkkaServlet(); + Servlet servlet = new se.scalablesolutions.akka.rest.AkkaServlet(); ServletAdapter adapter = new ServletAdapter(); adapter.setServletInstance(servlet); adapter.setContextPath(URI.getPath()); diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml index c7d0c3134e..5eceb218f5 100755 --- a/akka-kernel/pom.xml +++ b/akka-kernel/pom.xml @@ -47,6 +47,11 @@ ${project.groupId} ${project.version} + + akka-comet + ${project.groupId} + ${project.version} + diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index 8d684c4619..7cc7a9ce35 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -4,17 +4,10 @@ package se.scalablesolutions.akka -import com.sun.grizzly.http.SelectorThread -import com.sun.grizzly.http.servlet.ServletAdapter -import com.sun.grizzly.standalone.StaticStreamAlgorithm - -import javax.ws.rs.core.UriBuilder -import java.io.File -import java.net.URLClassLoader - -import se.scalablesolutions.akka.remote.RemoteNode +import se.scalablesolutions.akka.comet.BootableCometActorService +import se.scalablesolutions.akka.remote.{RemoteNode,BootableRemoteActorService} import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.actor.ActorRegistry +import se.scalablesolutions.akka.actor.{ActorRegistry,BootableActorLoaderService} /** * The Akka Kernel. @@ -24,22 +17,12 @@ import se.scalablesolutions.akka.actor.ActorRegistry object Kernel extends Logging { import Config._ - val BOOT_CLASSES = config.getList("akka.boot") - val RUN_REMOTE_SERVICE = config.getBool("akka.remote.server.service", true) - val RUN_REST_SERVICE = config.getBool("akka.rest.service", true) - val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost") - val REST_URL = "http://" + REST_HOSTNAME - val REST_PORT = config.getInt("akka.rest.port", 9998) - // FIXME add API to shut server down gracefully @volatile private var hasBooted = false - private var jerseySelectorThread: Option[SelectorThread] = None + private val startTime = System.currentTimeMillis - private var applicationLoader: Option[ClassLoader] = None - - private lazy val remoteServerThread = new Thread(new Runnable() { - def run = RemoteNode.start(applicationLoader) - }, "Akka Remote Service") + + object Bundles extends BootableActorLoaderService with BootableRemoteActorService with BootableCometActorService def main(args: Array[String]) = boot @@ -56,11 +39,7 @@ object Kernel extends Logging { if (!hasBooted) { if (withBanner) printBanner log.info("Starting Akka...") - - runApplicationBootClasses - if (RUN_REMOTE_SERVICE) startRemoteService - if (RUN_REST_SERVICE) startREST - + Bundles.onLoad Thread.currentThread.setContextClassLoader(getClass.getClassLoader) log.info("Akka started successfully") hasBooted = true @@ -71,74 +50,12 @@ object Kernel extends Logging { def shutdown = synchronized { if (hasBooted) { log.info("Shutting down Akka...") - ActorRegistry.shutdownAll - if (jerseySelectorThread.isDefined) { - log.info("Shutting down REST service (Jersey)") - jerseySelectorThread.get.stopEndpoint - } - if (remoteServerThread.isAlive) { - log.info("Shutting down remote service") - RemoteNode.shutdown - remoteServerThread.join(1000) - } + Bundles.onUnload log.info("Akka succesfully shut down") } } - def startRemoteService = remoteServerThread.start - - def startREST = { - val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build() - - val scheme = uri.getScheme - if (!scheme.equalsIgnoreCase("http")) throw new IllegalArgumentException( - "The URI scheme, of the URI " + REST_URL + ", must be equal (ignoring case) to 'http'") - - val adapter = new ServletAdapter - adapter.setHandleStaticResources(true) - adapter.setServletInstance(new AkkaCometServlet) - adapter.setContextPath(uri.getPath) - //Using autodetection for now - adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") - if (HOME.isDefined) adapter.setRootFolder(HOME.get + "/deploy/root") - log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolder, adapter.getContextPath) - - val ah = new com.sun.grizzly.arp.DefaultAsyncHandler - ah.addAsyncFilter(new com.sun.grizzly.comet.CometAsyncFilter) - jerseySelectorThread = Some(new SelectorThread) - jerseySelectorThread.get.setAlgorithmClassName(classOf[StaticStreamAlgorithm].getName) - jerseySelectorThread.get.setPort(REST_PORT) - jerseySelectorThread.get.setAdapter(adapter) - jerseySelectorThread.get.setEnableAsyncExecution(true) - jerseySelectorThread.get.setAsyncHandler(ah) - jerseySelectorThread.get.listen - - log.info("REST service started successfully. Listening to port [%s]", REST_PORT) - } - - private def runApplicationBootClasses = { - val loader = - if (HOME.isDefined) { - val CONFIG = HOME.get + "/config" - val DEPLOY = HOME.get + "/deploy" - val DEPLOY_DIR = new File(DEPLOY) - if (!DEPLOY_DIR.exists) { - log.error("Could not find a deploy directory at [%s]", DEPLOY) - System.exit(-1) - } - val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL - log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) - new URLClassLoader(toDeploy.toArray, getClass.getClassLoader) - } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) { - getClass.getClassLoader - } else throw new IllegalStateException( - "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") - for (clazz <- BOOT_CLASSES) { - log.info("Loading boot class [%s]", clazz) - loader.loadClass(clazz).newInstance - } - applicationLoader = Some(loader) - } + def startRemoteService = Bundles.startRemoteService private def printBanner = { log.info( diff --git a/akka-rest/pom.xml b/akka-rest/pom.xml index 32fbad0997..c354831cd4 100644 --- a/akka-rest/pom.xml +++ b/akka-rest/pom.xml @@ -23,11 +23,16 @@ ${project.version} - + - com.sun.grizzly - grizzly-comet-webserver - ${grizzly.version} + javax.servlet + servlet-api + 2.5 + + + com.sun.jersey + jersey-core + ${jersey.version} com.sun.jersey diff --git a/akka-rest/src/main/scala/AkkaServlet.scala b/akka-rest/src/main/scala/AkkaServlet.scala new file mode 100755 index 0000000000..4661ab5c14 --- /dev/null +++ b/akka-rest/src/main/scala/AkkaServlet.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.rest + +import se.scalablesolutions.akka.config.ConfiguratorRepository + +import com.sun.jersey.api.core.ResourceConfig +import com.sun.jersey.spi.container.servlet.ServletContainer +import com.sun.jersey.spi.container.WebApplication + +/** + * Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container, + * e.g. not using the Akka Kernel. + * + * @author Jonas Bonér + */ +class AkkaServlet extends ServletContainer { + import org.scala_tools.javautils.Imports._ + + override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = { + //Kernel.boot // will boot if not already booted by 'main' + + val configurators = ConfiguratorRepository.getConfigurators + + resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces).asJava) + resourceConfig.getProperties.put( + "com.sun.jersey.spi.container.ResourceFilters", + Config.config.getList("akka.rest.filters").mkString(",")) + + webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators)) + } +} \ No newline at end of file diff --git a/akka-samples-lift/src/main/webapp/WEB-INF/web.xml b/akka-samples-lift/src/main/webapp/WEB-INF/web.xml index c95dc59b2c..23348604bb 100755 --- a/akka-samples-lift/src/main/webapp/WEB-INF/web.xml +++ b/akka-samples-lift/src/main/webapp/WEB-INF/web.xml @@ -13,7 +13,7 @@ AkkaServlet - se.scalablesolutions.akka.AkkaServlet + se.scalablesolutions.akka.rest.AkkaServlet AkkaServlet diff --git a/akka-samples-security/src/main/webapp/WEB-INF/web.xml b/akka-samples-security/src/main/webapp/WEB-INF/web.xml index d0dfea26a8..02a6f7d918 100644 --- a/akka-samples-security/src/main/webapp/WEB-INF/web.xml +++ b/akka-samples-security/src/main/webapp/WEB-INF/web.xml @@ -8,7 +8,7 @@ AkkaServlet - se.scalablesolutions.akka.AkkaServlet + se.scalablesolutions.akka.rest.AkkaServlet diff --git a/akka-util/src/main/scala/Bootable.scala b/akka-util/src/main/scala/Bootable.scala new file mode 100644 index 0000000000..17de26f2fd --- /dev/null +++ b/akka-util/src/main/scala/Bootable.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.util + +trait Bootable { + def onLoad : Unit = () + def onUnload : Unit = () +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 07647bb6d1..14b5791d76 100755 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ akka-persistence-cassandra akka-persistence-mongo akka-rest + akka-comet akka-amqp akka-security akka-kernel