From 4a9fd4e453a898bc16e40c62ef526ceffcca87bf Mon Sep 17 00:00:00 2001 From: Garrick Evans Date: Sat, 20 Nov 2010 23:09:50 -0800 Subject: [PATCH] merge master to branch --- .../src/main/scala/akka/AkkaBroadcaster.scala | 40 ++++ .../main/scala/akka/AkkaCometServlet.scala | 101 +++++++++ .../src/main/scala/SimpleService.scala | 208 ++++++++++++++++++ 3 files changed, 349 insertions(+) create mode 100644 akka-http/src/main/scala/akka/AkkaBroadcaster.scala create mode 100644 akka-http/src/main/scala/akka/AkkaCometServlet.scala create mode 100644 akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala diff --git a/akka-http/src/main/scala/akka/AkkaBroadcaster.scala b/akka-http/src/main/scala/akka/AkkaBroadcaster.scala new file mode 100644 index 0000000000..fd0f76631a --- /dev/null +++ b/akka-http/src/main/scala/akka/AkkaBroadcaster.scala @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.comet + +import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource} + +import akka.actor.Actor._ +import akka.actor.Actor +import akka.dispatch.Dispatchers +import org.atmosphere.jersey.util.JerseyBroadcasterUtil + +object AkkaBroadcaster { + val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher") + + type Event = AtmosphereResourceEvent[_,_] + type Resource = AtmosphereResource[_,_] +} + +class AkkaBroadcaster extends org.atmosphere.jersey.util.JerseySimpleBroadcaster { + import AkkaBroadcaster._ + + //FIXME should be supervised + lazy val caster = actorOf(new Actor { + self.dispatcher = broadcasterDispatcher + def receive = { + case (r: Resource,e: Event) => JerseyBroadcasterUtil.broadcast(r,e) + } + }).start + + override def destroy { + super.destroy + caster.stop + } + + protected override def broadcast(r: Resource, e : Event) { + caster ! ((r,e)) + } +} diff --git a/akka-http/src/main/scala/akka/AkkaCometServlet.scala b/akka-http/src/main/scala/akka/AkkaCometServlet.scala new file mode 100644 index 0000000000..5b15096c92 --- /dev/null +++ b/akka-http/src/main/scala/akka/AkkaCometServlet.scala @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.comet + +import akka.util.Logging + +import java.util.{List => JList} +import javax.servlet.{ServletConfig,ServletContext} +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import com.sun.jersey.spi.container.servlet.ServletContainer + +import org.atmosphere.container.GrizzlyCometSupport +import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver} +import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler} + +class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProcessor { + //Delegate to implement the behavior for AtmosphereHandler + private val handler = new AbstractReflectorAtmosphereHandler { + override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) { + if (event ne null) { + event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, event) + event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_HANDLER, this) + service(event.getRequest, event.getResponse) + } + } + } + + override def onStateChange(event: AtmosphereResourceEvent[HttpServletRequest, HttpServletResponse]) { + if (event ne null) handler onStateChange event + } + + override def onRequest(resource: AtmosphereResource[HttpServletRequest, HttpServletResponse]) { + handler onRequest resource + } + } + +/** + * Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a + * standard servlet container, e.g. not using the Akka Kernel. + *

+ * Used by the Akka Kernel to bootstrap REST and Comet. + */ +class AkkaServlet extends AtmosphereServlet { + import akka.config.Config.{config => c} + + /* + * Configure Atmosphere and Jersey (default, fall-back values) + */ + addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true") + addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName) + addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true") + addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";")) + addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(",")) + + c.getInt("akka.rest.maxInactiveActivity") foreach { value => addInitParameter(CometSupport.MAX_INACTIVE,value.toString) } + c.getString("akka.rest.cometSupport") foreach { value => addInitParameter("cometSupport",value) } + + /* + * Provide a fallback for default values + */ + override def getInitParameter(key : String) = + Option(super.getInitParameter(key)).getOrElse(initParams get key) + + /* + * Provide a fallback for default values + */ + override def getInitParameterNames() = { + import scala.collection.JavaConversions._ + initParams.keySet.iterator ++ super.getInitParameterNames + } + + /** + * We override this to avoid Atmosphere looking for it's atmosphere.xml file + * Instead we specify what semantics we want in code. + */ + override def loadConfiguration(sc: ServletConfig) { + config.setSupportSession(false) + isBroadcasterSpecified = true + + //The bridge between Atmosphere and Jersey + val servlet = new AtmosphereRestServlet { + //These are needed to make sure that Jersey is reading the config from the outer servlet + override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key) + override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames() + } + + addAtmosphereHandler("/*", servlet, new AkkaBroadcaster) + } + + override lazy val createCometSupportResolver: CometSupportResolver = new DefaultCometSupportResolver(config) { + import scala.collection.JavaConversions._ + + lazy val desiredCometSupport = + Option(AkkaServlet.this.getInitParameter("cometSupport")) filter testClassExists map newCometSupport + + override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] = + desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault)) + } +} diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala new file mode 100644 index 0000000000..5ef2181e3b --- /dev/null +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -0,0 +1,208 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package sample.rest.scala + +import akka.actor.{SupervisorFactory, Actor} +import akka.actor.Actor._ +import akka.stm._ +import akka.stm.TransactionalMap +import akka.persistence.cassandra.CassandraStorage +import akka.config.Supervision._ +import akka.util.Logging +import scala.xml.NodeSeq +import java.lang.Integer +import java.nio.ByteBuffer +import javax.ws.rs.core.MultivaluedMap +import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam} +import akka.actor.ActorRegistry.actorFor +import org.atmosphere.annotation.{Broadcast, Suspend,Cluster} +import org.atmosphere.util.XSSHtmlFilter +import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} +import org.atmosphere.jersey.Broadcastable + +class Boot { + val factory = SupervisorFactory( + SupervisorConfig( + OneForOneStrategy(List(classOf[Exception]), 3, 100), + Supervise( + actorOf[SimpleServiceActor], + Permanent) :: + Supervise( + actorOf[ChatActor], + Permanent) :: + Supervise( + actorOf[PersistentSimpleServiceActor], + Permanent) + :: Nil)) + factory.newInstance.start +} + +/** + * Try service out by invoking (multiple times): + *

+ * curl http://localhost:9998/scalacount
+ * 
+ * Or browse to the URL from a web browser. + */ +@Path("/scalacount") +class SimpleService { + @GET + @Produces(Array("text/html")) + def count = { + //Fetch the first actor of type SimpleServiceActor + //Send it the "Tick" message and expect a NodeSeq back + val result = for{a <- actorFor[SimpleServiceActor] + r <- (a !! "Tick").as[NodeSeq]} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter + } +} + +class SimpleServiceActor extends Actor { + private val KEY = "COUNTER" + private var hasStartedTicking = false + private val storage = TransactionalMap[String, Integer]() + + def receive = { + case "Tick" => if (hasStartedTicking) { + val count = atomic { + val current = storage.get(KEY).get.asInstanceOf[Integer].intValue + val updated = current + 1 + storage.put(KEY, new Integer(updated)) + updated + } + self.reply(Tick:{count}) + } else { + atomic { + storage.put(KEY, new Integer(0)) + } + hasStartedTicking = true + self.reply(Tick: 0) + } + } +} + +@Path("/pubsub/") +class PubSub { + @GET + @Suspend + @Produces(Array("text/plain;charset=ISO-8859-1")) + @Path("/topic/{topic}/") + def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic) + + @GET + @Broadcast + @Path("/topic/{topic}/{message}/") + @Produces(Array("text/plain;charset=ISO-8859-1")) + def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic) +} + +/** + * Try service out by invoking (multiple times): + *
+ * curl http://localhost:9998/persistentscalacount
+ * 
+ * Or browse to the URL from a web browser. + */ +@Path("/persistentscalacount") +class PersistentSimpleService { + @GET + @Produces(Array("text/html")) + def count = { + //Fetch the first actor of type PersistentSimpleServiceActor + //Send it the "Tick" message and expect a NodeSeq back + val result = for{a <- actorFor[PersistentSimpleServiceActor] + r <- (a !! "Tick").as[NodeSeq]} yield r + //Return either the resulting NodeSeq or a default one + result getOrElse Error in counter + } +} + +class PersistentSimpleServiceActor extends Actor { + private val KEY = "COUNTER" + private var hasStartedTicking = false + private lazy val storage = CassandraStorage.newMap + + def receive = { + case "Tick" => if (hasStartedTicking) { + val count = atomic { + val bytes = storage.get(KEY.getBytes).get + val current = Integer.parseInt(new String(bytes, "UTF8")) + val updated = current + 1 + storage.put(KEY.getBytes, (updated).toString.getBytes) + updated + } +// val bytes = storage.get(KEY.getBytes).get +// val counter = ByteBuffer.wrap(bytes).getInt +// storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array) + self.reply(Tick:{count}) + } else { + atomic { + storage.put(KEY.getBytes, "0".getBytes) + } +// storage.put(KEY.getBytes, Array(0.toByte)) + hasStartedTicking = true + self.reply(Tick: 0) + } + } +} + +@Path("/chat") +class Chat { + import ChatActor.ChatMsg + @Suspend + @GET + @Produces(Array("text/html")) + def suspend = () + + @POST + @Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter])) + @Consumes(Array("application/x-www-form-urlencoded")) + @Produces(Array("text/html")) + def publishMessage(form: MultivaluedMap[String, String]) = { + val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message")) + //Fetch the first actor of type ChatActor + //Send it the "Tick" message and expect a NodeSeq back + val result = for{a <- actorFor[ChatActor] + r <- (a !! msg).as[String]} yield r + //Return either the resulting String or a default one + result getOrElse "System__error" + } +} + +object ChatActor { + case class ChatMsg(val who: String, val what: String, val msg: String) +} + +class ChatActor extends Actor with Logging { + import ChatActor.ChatMsg + def receive = { + case ChatMsg(who, what, msg) => { + what match { + case "login" => self.reply("System Message__" + who + " has joined.") + case "post" => self.reply("" + who + "__" + msg) + case _ => throw new WebApplicationException(422) + } + } + case x => log.info("recieve unknown: " + x) + } +} + + +class JsonpFilter extends BroadcastFilter with Logging { + def filter(an: AnyRef) = { + val m = an.toString + var name = m + var message = "" + + if (m.indexOf("__") > 0) { + name = m.substring(0, m.indexOf("__")) + message = m.substring(m.indexOf("__") + 2) + } + + new BroadcastFilter.BroadcastAction("\n") + } +}