diff --git a/akka-camel/src/test/scala/CamelContextLifecycleTest.scala b/akka-camel/src/test/scala/CamelContextLifecycleTest.scala index b8caea0805..cf558ec8d9 100644 --- a/akka-camel/src/test/scala/CamelContextLifecycleTest.scala +++ b/akka-camel/src/test/scala/CamelContextLifecycleTest.scala @@ -13,7 +13,9 @@ class CamelContextLifecycleTest extends JUnitSuite with CamelContextLifecycle { init(ctx) assert(context.isStreamCaching === true) assert(!context.asInstanceOf[TestCamelContext].isStarted) - assert(!template.asInstanceOf[DefaultProducerTemplate].isStarted) + // In Camel 2.3 CamelComtext.createProducerTemplate starts + // the template before returning it (wasn't started in 2.2) + assert(template.asInstanceOf[DefaultProducerTemplate].isStarted) start assert(context.asInstanceOf[TestCamelContext].isStarted) assert(template.asInstanceOf[DefaultProducerTemplate].isStarted) diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index de0d174fc7..a44db8fd07 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -46,12 +46,11 @@ trait BootableActorLoaderService extends Bootable with Logging { log.debug("Loading dependencies [%s]", dependencyJars) val allJars = toDeploy ::: dependencyJars - val parentClassLoader = classOf[Seq[_]].getClassLoader URLClassLoader.newInstance( allJars.toArray.asInstanceOf[Array[URL]], - ClassLoader.getSystemClassLoader) + Thread.currentThread.getContextClassLoader) //parentClassLoader) - } else getClass.getClassLoader) + } else Thread.currentThread.getContextClassLoader) } abstract override def onLoad = { @@ -62,5 +61,8 @@ trait BootableActorLoaderService extends Bootable with Logging { super.onLoad } - abstract override def onUnload = ActorRegistry.shutdownAll + abstract override def onUnload = { + super.onUnload + ActorRegistry.shutdownAll + } } diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 948cb07a8b..66f7b59baa 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -23,25 +23,6 @@ trait Cluster { */ def name: String - /** - * Adds the specified hostname + port as a local node - * This information will be propagated to other nodes in the cluster - * and will be available at the other nodes through lookup and foreach - */ - def registerLocalNode(hostname: String, port: Int): Unit - - /** - * Removes the specified hostname + port from the local node - * This information will be propagated to other nodes in the cluster - * and will no longer be available at the other nodes through lookup and foreach - */ - def deregisterLocalNode(hostname: String, port: Int): Unit - - /** - * Sends the message to all Actors of the specified type on all other nodes in the cluster - */ - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit - /** * Traverses all known remote addresses avaiable at all other nodes in the cluster * and applies the given PartialFunction on the first address that it's defined at @@ -65,8 +46,6 @@ trait ClusterActor extends Actor with Cluster { val name = config.getString("akka.remote.cluster.name", "default") @volatile protected var serializer : Serializer = _ - - private[remote] def setSerializer(s : Serializer) : Unit = serializer = s } /** @@ -87,6 +66,7 @@ private[akka] object ClusterActor { private[akka] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage private[akka] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage private[akka] case class Node(endpoints: List[RemoteAddress]) + private[akka] case class InitClusterActor(serializer : Serializer) } /** @@ -168,6 +148,10 @@ abstract class BasicClusterActor extends ClusterActor with Logging { local = Node(local.endpoints.filterNot(_ == s)) broadcast(Papers(local.endpoints)) } + + case InitClusterActor(s) => { + serializer = s + } } /** @@ -206,24 +190,6 @@ abstract class BasicClusterActor extends ClusterActor with Logging { * Applies the given function to all remote addresses known */ def foreach(f: (RemoteAddress) => Unit): Unit = remotes.valuesIterator.toList.flatMap(_.endpoints).foreach(f) - - /** - * Registers a local endpoint - */ - def registerLocalNode(hostname: String, port: Int): Unit = - self ! RegisterLocalNode(RemoteAddress(hostname, port)) - - /** - * Deregisters a local endpoint - */ - def deregisterLocalNode(hostname: String, port: Int): Unit = - self ! DeregisterLocalNode(RemoteAddress(hostname, port)) - - /** - * Broadcasts the specified message to all Actors of type Class on all known Nodes - */ - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = - self ! RelayedMessage(to.getName, msg) } /** @@ -232,28 +198,22 @@ abstract class BasicClusterActor extends ClusterActor with Logging { * Loads a specified ClusterActor and delegates to that instance. */ object Cluster extends Cluster with Logging { + //Import messages + import ClusterActor._ + lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName lazy val DEFAULT_CLUSTER_ACTOR_CLASS_NAME = classOf[JGroupsClusterActor].getName - @volatile private[remote] var clusterActor: Option[ClusterActor] = None @volatile private[remote] var clusterActorRef: Option[ActorRef] = None + @volatile private[akka] var classLoader : Option[ClassLoader] = Some(getClass.getClassLoader) - private[remote] def createClusterActor(loader: ClassLoader): Option[ActorRef] = { + private[remote] def createClusterActor(): Option[ActorRef] = { val name = config.getString("akka.remote.cluster.actor", DEFAULT_CLUSTER_ACTOR_CLASS_NAME) if (name.isEmpty) throw new IllegalArgumentException( "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") - val serializer = Class.forName(config.getString( - "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) - .newInstance.asInstanceOf[Serializer] - serializer.classLoader = Some(loader) - try { - Some(Actor.actorOf { - val a = Class.forName(name).newInstance.asInstanceOf[ClusterActor] - a setSerializer serializer - a - }) + Some(Actor.actorOf(Class.forName(name).newInstance.asInstanceOf[ClusterActor])) } catch { case e => log.error(e, "Couldn't load Cluster provider: [%s]", name) @@ -267,15 +227,27 @@ object Cluster extends Cluster with Logging { RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), Supervise(actor, LifeCycle(Permanent)) :: Nil))) + private[this] def clusterActor = if(clusterActorRef.isEmpty) None else Some(clusterActorRef.get.actor.asInstanceOf[ClusterActor]) + def name = clusterActor.map(_.name).getOrElse("No cluster") def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf)) - def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.registerLocalNode(hostname, port)) + /**Adds the specified hostname + port as a local node + * This information will be propagated to other nodes in the cluster + * and will be available at the other nodes through lookup and foreach + */ + def registerLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! RegisterLocalNode(RemoteAddress(hostname, port))) - def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port)) + /**Removes the specified hostname + port from the local node + * This information will be propagated to other nodes in the cluster + * and will no longer be available at the other nodes through lookup and foreach + */ + def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActorRef.foreach(_ ! DeregisterLocalNode(RemoteAddress(hostname, port))) - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg)) + /**Sends the message to all Actors of the specified type on all other nodes in the cluster + */ + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActorRef.foreach(_ ! RelayedMessage(to.getName, msg)) def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f)) @@ -283,14 +255,21 @@ object Cluster extends Cluster with Logging { def start(serializerClassLoader: Option[ClassLoader]): Unit = synchronized { log.info("Starting up Cluster Service...") - if (clusterActor.isEmpty) { + if (clusterActorRef.isEmpty) { for { - actorRef <- createClusterActor(serializerClassLoader getOrElse getClass.getClassLoader) + actorRef <- createClusterActor() sup <- createSupervisor(actorRef) } { - clusterActorRef = Some(actorRef.start) - clusterActor = Some(actorRef.actor.asInstanceOf[ClusterActor]) + val serializer = Class.forName(config.getString( + "akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) + .newInstance.asInstanceOf[Serializer] + + classLoader = serializerClassLoader orElse classLoader + serializer.classLoader = classLoader + actorRef.start sup.start + actorRef ! InitClusterActor(serializer) + clusterActorRef = Some(actorRef) } } } @@ -301,6 +280,6 @@ object Cluster extends Cluster with Logging { c <- clusterActorRef s <- c.supervisor } s.stop - clusterActor = None + classLoader = Some(getClass.getClassLoader) } } diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java deleted file mode 100644 index d6d4ae9060..0000000000 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.api; - -import com.sun.grizzly.http.SelectorThread; -import com.sun.grizzly.http.servlet.ServletAdapter; -import com.sun.grizzly.tcp.Adapter; -import com.sun.grizzly.standalone.StaticStreamAlgorithm; - -import javax.ws.rs.core.UriBuilder; -import javax.servlet.Servlet; - -import junit.framework.TestCase; -import org.junit.*; - -import java.io.IOException; -import java.net.URI; - -import se.scalablesolutions.akka.config.*; -import static se.scalablesolutions.akka.config.JavaConfig.*; - - -public class RestTest extends TestCase { - - private static int PORT = 9998; - private static URI URI = UriBuilder.fromUri("http://localhost/").port(PORT).build(); - private static SelectorThread selector = null; - private static ActiveObjectConfigurator conf = new ActiveObjectConfigurator(); - - @BeforeClass - protected void setUp() { - conf.configure( - new RestartStrategy(new AllForOne(), 3, 5000, new Class[]{Exception.class}), - new Component[] { - new Component( - JerseyFoo.class, - new LifeCycle(new Permanent()), - 10000000) - }).inject().supervise(); - selector = startJersey(); - } - - public void testSimpleRequest() { - assertTrue(true); - } - -/* - - @Test - public void testSimpleRequest() throws IOException, InstantiationException { - selector.listen(); - Client client = Client.create(); - WebResource webResource = client.resource(URI); - String responseMsg = webResource.path("/foo").get(String.class); - assertEquals("hello foo", responseMsg); - selector.stopEndpoint(); - } -*/ - private static SelectorThread startJersey() { - try { - Servlet servlet = new se.scalablesolutions.akka.rest.AkkaServlet(); - ServletAdapter adapter = new ServletAdapter(); - adapter.setServletInstance(servlet); - adapter.setContextPath(URI.getPath()); - return createGrizzlySelector(adapter, URI, PORT); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static SelectorThread createGrizzlySelector(Adapter adapter, URI uri, int port) throws IOException, InstantiationException { - final String scheme = uri.getScheme(); - if (!scheme.equalsIgnoreCase("http")) - throw new IllegalArgumentException("The URI scheme, of the URI " + uri + ", must be equal (ignoring case) to 'http'"); - final SelectorThread selectorThread = new SelectorThread(); - selectorThread.setAlgorithmClassName(StaticStreamAlgorithm.class.getName()); - selectorThread.setPort(port); - selectorThread.setAdapter(adapter); - return selectorThread; - } -} - diff --git a/akka-http/src/main/scala/ActorComponentProvider.scala b/akka-http/src/main/scala/ActorComponentProvider.scala deleted file mode 100644 index 52512b6929..0000000000 --- a/akka-http/src/main/scala/ActorComponentProvider.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.rest - -import com.sun.jersey.core.spi.component.ComponentScope -import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider - -import se.scalablesolutions.akka.config.Configurator -import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.actor.Actor - -class ActorComponentProvider(val clazz: Class[_], val configurators: List[Configurator]) - extends IoCFullyManagedComponentProvider with Logging { - - override def getScope = ComponentScope.Singleton - - override def getInstance: AnyRef = { - val instances = for { - conf <- configurators - if conf.isDefined(clazz) - instance <- conf.getInstance(clazz) - } yield instance - if (instances.isEmpty) throw new IllegalArgumentException( - "No Actor or Active Object for class [" + clazz + "] could be found.\nMake sure you have defined and configured the class as an Active Object or Actor in a supervisor hierarchy.") - else instances.head.asInstanceOf[AnyRef] - } -} diff --git a/akka-http/src/main/scala/ActorComponentProviderFactory.scala b/akka-http/src/main/scala/ActorComponentProviderFactory.scala deleted file mode 100644 index e1ea94347f..0000000000 --- a/akka-http/src/main/scala/ActorComponentProviderFactory.scala +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.rest - -import com.sun.jersey.core.spi.component.ioc.{IoCComponentProvider,IoCComponentProviderFactory} -import com.sun.jersey.core.spi.component.{ComponentContext} - -import se.scalablesolutions.akka.config.Configurator -import se.scalablesolutions.akka.util.Logging - -class ActorComponentProviderFactory(val configurators: List[Configurator]) -extends IoCComponentProviderFactory with Logging { - override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz) - - override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = { - configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null) - } -} diff --git a/akka-http/src/main/scala/AkkaCometServlet.scala b/akka-http/src/main/scala/AkkaCometServlet.scala index 0ce6f8312a..01fe5272be 100644 --- a/akka-http/src/main/scala/AkkaCometServlet.scala +++ b/akka-http/src/main/scala/AkkaCometServlet.scala @@ -5,25 +5,17 @@ package se.scalablesolutions.akka.comet import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.rest.{AkkaServlet => RestServlet} import java.util.{List => JList} import javax.servlet.ServletConfig import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import com.sun.jersey.spi.container.servlet.ServletContainer import org.atmosphere.container.GrizzlyCometSupport import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} -/** - * Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a - * standard servlet container, e.g. not using the Akka Kernel. - *

- * Used by the Akka Kernel to bootstrap REST and Comet. - */ -class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging { - val servlet = new RestServlet with AtmosphereServletProcessor { - +class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProcessor { //Delegate to implement the behavior for AtmosphereHandler private val handler = new AbstractReflectorAtmosphereHandler { override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) { @@ -44,6 +36,18 @@ class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging { } } +/** + * Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a + * standard servlet container, e.g. not using the Akka Kernel. + *

+ * Used by the Akka Kernel to bootstrap REST and Comet. + */ +class AkkaServlet extends org.atmosphere.cpr.AtmosphereServlet with Logging { + lazy val servlet = createRestServlet + + protected def createRestServlet : AtmosphereRestServlet = new AtmosphereRestServlet { + override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key) + } /** * We override this to avoid Atmosphere looking for it's atmosphere.xml file * Instead we specify what semantics we want in code. diff --git a/akka-http/src/main/scala/AkkaServlet.scala b/akka-http/src/main/scala/AkkaServlet.scala deleted file mode 100644 index aecdaa9671..0000000000 --- a/akka-http/src/main/scala/AkkaServlet.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.rest - -import se.scalablesolutions.akka.config.ConfiguratorRepository -import se.scalablesolutions.akka.config.Config.config - -import com.sun.jersey.api.core.ResourceConfig -import com.sun.jersey.spi.container.servlet.ServletContainer -import com.sun.jersey.spi.container.WebApplication - -/** - * Akka's servlet to be used when deploying actors exposed as REST services in a standard servlet container, - * e.g. not using the Akka Kernel. - * - * @author Jonas Bonér - */ -class AkkaServlet extends ServletContainer { - import scala.collection.JavaConversions._ - - override def initiate(resourceConfig: ResourceConfig, webApplication: WebApplication) = { - val configurators = ConfiguratorRepository.getConfigurators - - resourceConfig.getClasses.addAll(configurators.flatMap(_.getComponentInterfaces)) - resourceConfig.getProperties.put( - "com.sun.jersey.spi.container.ResourceFilters", - config.getList("akka.rest.filters").mkString(",")) - - webApplication.initiate(resourceConfig, new ActorComponentProviderFactory(configurators)) - } -} diff --git a/akka-kernel/src/main/scala/EmbeddedAppServer.scala b/akka-kernel/src/main/scala/EmbeddedAppServer.scala index a49d6276d1..41e6dd44ae 100644 --- a/akka-kernel/src/main/scala/EmbeddedAppServer.scala +++ b/akka-kernel/src/main/scala/EmbeddedAppServer.scala @@ -5,14 +5,15 @@ package se.scalablesolutions.akka.kernel import com.sun.grizzly.http.SelectorThread -import com.sun.grizzly.http.servlet.ServletAdapter +import com.sun.grizzly.http.servlet.{ ServletAdapter } import com.sun.grizzly.standalone.StaticStreamAlgorithm import javax.ws.rs.core.UriBuilder +import javax.servlet.ServletConfig import se.scalablesolutions.akka.actor.BootableActorLoaderService import se.scalablesolutions.akka.util.{Bootable, Logging} -import se.scalablesolutions.akka.comet.AkkaServlet +import se.scalablesolutions.akka.comet.{ AkkaServlet } /** * Handles the Akka Comet Support (load/unload) @@ -42,9 +43,31 @@ trait EmbeddedAppServer extends Bootable with Logging { val adapter = new ServletAdapter adapter.setHandleStaticResources(true) - adapter.setServletInstance(new AkkaServlet) + adapter.setServletInstance(new AkkaServlet { + override def init(sc : ServletConfig) : Unit = { + val cl = Thread.currentThread.getContextClassLoader + try { + Thread.currentThread.setContextClassLoader(applicationLoader.get) + super.init(sc) + } + finally { + Thread.currentThread.setContextClassLoader(cl) + } + } + }) + adapter.setContextPath(uri.getPath) - adapter.addInitParameter("cometSupport", "org.atmosphere.container.GrizzlyCometSupport") + adapter.addInitParameter("cometSupport", + "org.atmosphere.container.GrizzlyCometSupport") + adapter.addInitParameter("com.sun.jersey.config.property.resourceConfigClass", + "com.sun.jersey.api.core.PackagesResourceConfig") + adapter.addInitParameter("com.sun.jersey.config.property.packages", + config.getList("akka.rest.resource_packages").mkString(";") + ) + adapter.addInitParameter("com.sun.jersey.spi.container.ResourceFilters", + config.getList("akka.rest.filters").mkString(",") + ) + if (HOME.isDefined) adapter.addRootFolder(HOME.get + "/deploy/root") log.info("REST service root path [%s] and context path [%s]", adapter.getRootFolders, adapter.getContextPath) @@ -65,9 +88,10 @@ trait EmbeddedAppServer extends Bootable with Logging { abstract override def onUnload = { super.onUnload - if (jerseySelectorThread.isDefined) { + jerseySelectorThread foreach { (t) => { log.info("Shutting down REST service (Jersey)") - jerseySelectorThread.get.stopEndpoint + t.stopEndpoint + } } } } diff --git a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala index 810f31aba5..5b08e15a1a 100644 --- a/akka-samples/akka-sample-camel/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-camel/src/main/scala/Boot.scala @@ -45,21 +45,19 @@ class Boot { // Publish subscribe example // - // Cometd example is disabled because of unresolved sbt/ivy dependency resolution issues. - // If you want to run this example, make sure to replace all jetty-*-6.1.22.jar files - // on the classpath with corresponding jetty-*-6.1.11.jar files. + // Cometd example commented out because camel-cometd is broken in Camel 2.3 // - //val cometdUri = "cometd://localhost:8111/test/abc?resourceBase=target" - //val cometdSubscriber = new Subscriber("cometd-subscriber", cometdUri).start - //val cometdPublisher = new Publisher("cometd-publisher", cometdUri).start + //val cometdUri = "cometd://localhost:8111/test/abc?baseResource=file:target" + //val cometdSubscriber = actorOf(new Subscriber("cometd-subscriber", cometdUri)).start + //val cometdPublisher = actorOf(new Publisher("cometd-publisher", cometdUri)).start val jmsUri = "jms:topic:test" val jmsSubscriber1 = actorOf(new Subscriber("jms-subscriber-1", jmsUri)).start val jmsSubscriber2 = actorOf(new Subscriber("jms-subscriber-2", jmsUri)).start val jmsPublisher = actorOf(new Publisher("jms-publisher", jmsUri)).start - //val cometdPublisherBridge = new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher).start + //val cometdPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/cometd", cometdPublisher)).start val jmsPublisherBridge = actorOf(new PublisherBridge("jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher)).start actorOf[Consumer4].start // POSTing "stop" to http://0.0.0.0:8877/camel/stop stops and unpublishes this actor diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index 6e67772f7e..e8f3576e9f 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -11,12 +11,12 @@ import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.comet.AkkaClusterBroadcastFilter - +import scala.xml.NodeSeq import java.lang.Integer import java.nio.ByteBuffer import javax.ws.rs.core.MultivaluedMap import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} - +import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor import org.atmosphere.annotation.{Broadcast, Suspend,Cluster} import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} @@ -27,16 +27,13 @@ class Boot { SupervisorConfig( RestartStrategy(OneForOne, 3, 100,List(classOf[Exception])), Supervise( - actorOf[SimpleService], + actorOf[SimpleServiceActor], LifeCycle(Permanent)) :: Supervise( - actorOf[Chat], + actorOf[ChatActor], LifeCycle(Permanent)) :: Supervise( - actorOf[PersistentSimpleService], - LifeCycle(Permanent)) :: - Supervise( - actorOf[PubSub], + actorOf[PersistentSimpleServiceActor], LifeCycle(Permanent)) :: Nil)) factory.newInstance.start @@ -50,19 +47,25 @@ class Boot { * Or browse to the URL from a web browser. */ @Path("/scalacount") -class SimpleService extends Transactor { - - case object Tick +class SimpleService { + @GET + @Produces(Array("text/html")) + def count = { + //Fetch the first actor of type SimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[SimpleServiceActor]).headOption + r <- a.!![NodeSeq]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter + } +} +class SimpleServiceActor extends Transactor { private val KEY = "COUNTER" private var hasStartedTicking = false private lazy val storage = TransactionalState.newMap[String, Integer] - @GET - @Produces(Array("text/html")) - def count = (self !! Tick).getOrElse(Error in counter) - def receive = { - case Tick => if (hasStartedTicking) { + case "Tick" => if (hasStartedTicking) { val counter = storage.get(KEY).get.asInstanceOf[Integer].intValue storage.put(KEY, new Integer(counter + 1)) self.reply(Tick:{counter + 1}) @@ -75,9 +78,7 @@ class SimpleService extends Transactor { } @Path("/pubsub/") -class PubSub extends Actor { - case class Msg(topic: String, message: String) - +class PubSub { @GET @Suspend @Produces(Array("text/plain;charset=ISO-8859-1")) @@ -90,8 +91,6 @@ class PubSub extends Actor { @Produces(Array("text/plain;charset=ISO-8859-1")) //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "foo") def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic) - - def receive = { case _ => } } /** @@ -102,19 +101,26 @@ class PubSub extends Actor { * Or browse to the URL from a web browser. */ @Path("/persistentscalacount") -class PersistentSimpleService extends Transactor { +class PersistentSimpleService { + @GET + @Produces(Array("text/html")) + def count = { + //Fetch the first actor of type PersistentSimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[PersistentSimpleServiceActor]).headOption + r <- a.!![NodeSeq]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter + } +} - case object Tick +class PersistentSimpleServiceActor extends Transactor { private val KEY = "COUNTER" private var hasStartedTicking = false private lazy val storage = CassandraStorage.newMap - @GET - @Produces(Array("text/html")) - def count = (self !! Tick).getOrElse(Error in counter) - def receive = { - case Tick => if (hasStartedTicking) { + case "Tick" => if (hasStartedTicking) { val bytes = storage.get(KEY.getBytes).get val counter = ByteBuffer.wrap(bytes).getInt storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) @@ -128,16 +134,37 @@ class PersistentSimpleService extends Transactor { } @Path("/chat") -class Chat extends Actor with Logging { - case class Chat(val who: String, val what: String, val msg: String) - +class Chat { + import ChatActor.ChatMsg @Suspend @GET @Produces(Array("text/html")) def suspend = () + @POST + @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) + //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar") + @Consumes(Array("application/x-www-form-urlencoded")) + @Produces(Array("text/html")) + def publishMessage(form: MultivaluedMap[String, String]) = { + val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message")) + //Fetch the first actor of type ChatActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[ChatActor]).headOption + r <- a.!![String](msg)} yield r + //Return either the resulting String or a default one + result getOrElse "System__error" + } +} + +object ChatActor { + case class ChatMsg(val who: String, val what: String, val msg: String) +} + +class ChatActor extends Actor with Logging { + import ChatActor.ChatMsg def receive = { - case Chat(who, what, msg) => { + case ChatMsg(who, what, msg) => { what match { case "login" => self.reply("System Message__" + who + " has joined.") case "post" => self.reply("" + who + "__" + msg) @@ -146,16 +173,6 @@ class Chat extends Actor with Logging { } case x => log.info("recieve unknown: " + x) } - - @POST - @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) - //FIXME @Cluster(value = Array(classOf[AkkaClusterBroadcastFilter]),name = "bar") - @Consumes(Array("application/x-www-form-urlencoded")) - @Produces(Array("text/html")) - def publishMessage(form: MultivaluedMap[String, String]) = - (self !! Chat(form.getFirst("name"), - form.getFirst("action"), - form.getFirst("message"))).getOrElse("System__error") } diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala index 680aabd718..4fd5a8a63d 100644 --- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala @@ -10,6 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo} import se.scalablesolutions.akka.stm.TransactionalState +import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor class Boot { val factory = SupervisorFactory( @@ -90,12 +91,7 @@ import javax.annotation.security.{RolesAllowed, DenyAll, PermitAll} import javax.ws.rs.{GET, Path, Produces} @Path("/secureticker") -class SecureTickActor extends Transactor with Logging { - - case object Tick - private val KEY = "COUNTER" - private var hasStartedTicking = false - private lazy val storage = TransactionalState.newMap[String, Integer] +class SecureTickService { /** * allow access for any user to "/secureticker/public" @@ -123,15 +119,25 @@ class SecureTickActor extends Transactor with Logging { @DenyAll def paranoiaTick = tick - def tick = (self !! Tick) match { - case (Some(counter)) => (Tick: - {counter} - ) - case _ => (Error in counter) + def tick = { + //Fetch the first actor of type PersistentSimpleServiceActor + //Send it the "Tick" message and expect a NdeSeq back + val result = for{a <- actorsFor(classOf[SecureTickActor]).headOption + r <- a.!![Integer]("Tick")} yield r + //Return either the resulting NodeSeq or a default one + result match { + case (Some(counter)) => (Tick: {counter}) + case _ => (Error in counter) + } } +} +class SecureTickActor extends Transactor with Logging { + private val KEY = "COUNTER" + private var hasStartedTicking = false + private lazy val storage = TransactionalState.newMap[String, Integer] def receive = { - case Tick => if (hasStartedTicking) { + case "Tick" => if (hasStartedTicking) { val counter = storage.get(KEY).get.intValue storage.put(KEY, counter + 1) self.reply(new Integer(counter + 1)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index c0499632c8..5a2b811a05 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -54,6 +54,7 @@ hostname = "localhost" port = 9998 filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use + resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) #IF you are using a KerberosAuthenticationActor diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index c4dd16a7df..daf43c2ca4 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -40,20 +40,38 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { lazy val dist = zipTask(allArtifacts, "dist", distName) dependsOn (`package`) describedAs("Zips up the distribution.") - // ------------------------------------------------------------ - // repositories - val embeddedrepo = "embedded repo" at (info.projectPath / "embedded-repo").asURL.toString - val sunjdmk = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo" + // ------------------------------------------------------------------------------------------------------------------- + // Repositories + // Every dependency that cannot be resolved from the built-in repositories (Maven Central and Scala Tools Releases) + // must be resolved from a ModuleConfiguration. This will result in a significant acceleration of the update action. + // Therefore, if repositories are defined, this must happen as def, not as val. + // ------------------------------------------------------------------------------------------------------------------- + val embeddedRepo = "Embedded Repo" at (info.projectPath / "embedded-repo").asURL.toString // Fast enough => No need for a module configuration here! + val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) + def guiceyFruitRepo = "GuiceyFruit Repo" at "http://guiceyfruit.googlecode.com/svn/repo/releases/" + val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", guiceyFruitRepo) + def jbossRepo = "JBoss Repo" at "https://repository.jboss.org/nexus/content/groups/public/" + val jbossModuleConfig = ModuleConfiguration("org.jboss", jbossRepo) + val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", jbossRepo) + val jgroupsModuleConfig = ModuleConfiguration("jgroups", jbossRepo) + def sunjdmkRepo = "Sun JDMK Repo" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo" + val jmsModuleConfig = ModuleConfiguration("javax.jms", sunjdmkRepo) + val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", sunjdmkRepo) + val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", sunjdmkRepo) + def javaNetRepo = "java.net Repo" at "http://download.java.net/maven/2" + val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", javaNetRepo) + val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", javaNetRepo) + val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", javaNetRepo) + val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", javaNetRepo) + val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) + + /* These are not needed and can possibly be deleted. val databinder = "DataBinder" at "http://databinder.net/repo" // val configgy = "Configgy" at "http://www.lag.net/repo" val codehaus = "Codehaus" at "http://repository.codehaus.org" val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org" - val jboss = "jBoss" at "https://repository.jboss.org/nexus/content/groups/public/" - val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/" val google = "Google" at "http://google-maven-repository.googlecode.com/svn/repository" - val java_net = "java.net" at "http://download.java.net/maven/2" - val scala_tools_snapshots = "scala-tools snapshots" at "http://scala-tools.org/repo-snapshots" - val scala_tools_releases = "scala-tools releases" at "http://scala-tools.org/repo-releases" + */ // ------------------------------------------------------------ // project defintions @@ -106,6 +124,9 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { " dist/akka-jta_%s-%s.jar".format(buildScalaVersion, version) ) + //Exclude slf4j1.5.11 from the classpath, it's conflicting... + override def runClasspath = super.runClasspath --- (super.runClasspath ** "slf4j*1.5.11.jar") + // ------------------------------------------------------------ // publishing override def managedStyle = ManagedStyle.Maven @@ -213,7 +234,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { - val camel_core = "org.apache.camel" % "camel-core" % "2.2.0" % "compile" + val camel_core = "org.apache.camel" % "camel-core" % "2.3.0" % "compile" } class AkkaPersistenceCommonProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { @@ -316,11 +337,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaSampleCamelProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) { - val commons_codec = "commons-codec" % "commons-codec" % "1.3" % "compile" val spring_jms = "org.springframework" % "spring-jms" % "3.0.1.RELEASE" % "compile" - val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.2.0" % "compile" - val camel_jms = "org.apache.camel" % "camel-jms" % "2.2.0" % "compile" - val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.0" % "compile" + val camel_jetty = "org.apache.camel" % "camel-jetty" % "2.3.0" % "compile" + val camel_jms = "org.apache.camel" % "camel-jms" % "2.3.0" % "compile" + val activemq_core = "org.apache.activemq" % "activemq-core" % "5.3.2" % "compile" } class AkkaSampleSecurityProject(info: ProjectInfo) extends AkkaDefaultProject(info, deployPath) {