merge master to branch
This commit is contained in:
parent
b9de374f69
commit
ae1ae76266
3 changed files with 349 additions and 0 deletions
40
akka-http/src/main/scala/akka/AkkaBroadcaster.scala
Normal file
40
akka-http/src/main/scala/akka/AkkaBroadcaster.scala
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.comet
|
||||
|
||||
import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource}
|
||||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.Actor
|
||||
import akka.dispatch.Dispatchers
|
||||
import org.atmosphere.jersey.util.JerseyBroadcasterUtil
|
||||
|
||||
object AkkaBroadcaster {
|
||||
val broadcasterDispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
|
||||
|
||||
type Event = AtmosphereResourceEvent[_,_]
|
||||
type Resource = AtmosphereResource[_,_]
|
||||
}
|
||||
|
||||
class AkkaBroadcaster extends org.atmosphere.jersey.util.JerseySimpleBroadcaster {
|
||||
import AkkaBroadcaster._
|
||||
|
||||
//FIXME should be supervised
|
||||
lazy val caster = actorOf(new Actor {
|
||||
self.dispatcher = broadcasterDispatcher
|
||||
def receive = {
|
||||
case (r: Resource,e: Event) => JerseyBroadcasterUtil.broadcast(r,e)
|
||||
}
|
||||
}).start
|
||||
|
||||
override def destroy {
|
||||
super.destroy
|
||||
caster.stop
|
||||
}
|
||||
|
||||
protected override def broadcast(r: Resource, e : Event) {
|
||||
caster ! ((r,e))
|
||||
}
|
||||
}
|
||||
101
akka-http/src/main/scala/akka/AkkaCometServlet.scala
Normal file
101
akka-http/src/main/scala/akka/AkkaCometServlet.scala
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.comet
|
||||
|
||||
import akka.util.Logging
|
||||
|
||||
import java.util.{List => JList}
|
||||
import javax.servlet.{ServletConfig,ServletContext}
|
||||
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
|
||||
import com.sun.jersey.spi.container.servlet.ServletContainer
|
||||
|
||||
import org.atmosphere.container.GrizzlyCometSupport
|
||||
import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
|
||||
import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
|
||||
|
||||
class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProcessor {
|
||||
//Delegate to implement the behavior for AtmosphereHandler
|
||||
private val handler = new AbstractReflectorAtmosphereHandler {
|
||||
override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
|
||||
if (event ne null) {
|
||||
event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, event)
|
||||
event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_HANDLER, this)
|
||||
service(event.getRequest, event.getResponse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onStateChange(event: AtmosphereResourceEvent[HttpServletRequest, HttpServletResponse]) {
|
||||
if (event ne null) handler onStateChange event
|
||||
}
|
||||
|
||||
override def onRequest(resource: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
|
||||
handler onRequest resource
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a
|
||||
* standard servlet container, e.g. not using the Akka Kernel.
|
||||
* <p/>
|
||||
* Used by the Akka Kernel to bootstrap REST and Comet.
|
||||
*/
|
||||
class AkkaServlet extends AtmosphereServlet {
|
||||
import akka.config.Config.{config => c}
|
||||
|
||||
/*
|
||||
* Configure Atmosphere and Jersey (default, fall-back values)
|
||||
*/
|
||||
addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true")
|
||||
addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName)
|
||||
addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true")
|
||||
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.rest.resource_packages").mkString(";"))
|
||||
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.rest.filters").mkString(","))
|
||||
|
||||
c.getInt("akka.rest.maxInactiveActivity") foreach { value => addInitParameter(CometSupport.MAX_INACTIVE,value.toString) }
|
||||
c.getString("akka.rest.cometSupport") foreach { value => addInitParameter("cometSupport",value) }
|
||||
|
||||
/*
|
||||
* Provide a fallback for default values
|
||||
*/
|
||||
override def getInitParameter(key : String) =
|
||||
Option(super.getInitParameter(key)).getOrElse(initParams get key)
|
||||
|
||||
/*
|
||||
* Provide a fallback for default values
|
||||
*/
|
||||
override def getInitParameterNames() = {
|
||||
import scala.collection.JavaConversions._
|
||||
initParams.keySet.iterator ++ super.getInitParameterNames
|
||||
}
|
||||
|
||||
/**
|
||||
* We override this to avoid Atmosphere looking for it's atmosphere.xml file
|
||||
* Instead we specify what semantics we want in code.
|
||||
*/
|
||||
override def loadConfiguration(sc: ServletConfig) {
|
||||
config.setSupportSession(false)
|
||||
isBroadcasterSpecified = true
|
||||
|
||||
//The bridge between Atmosphere and Jersey
|
||||
val servlet = new AtmosphereRestServlet {
|
||||
//These are needed to make sure that Jersey is reading the config from the outer servlet
|
||||
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
|
||||
override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
|
||||
}
|
||||
|
||||
addAtmosphereHandler("/*", servlet, new AkkaBroadcaster)
|
||||
}
|
||||
|
||||
override lazy val createCometSupportResolver: CometSupportResolver = new DefaultCometSupportResolver(config) {
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
lazy val desiredCometSupport =
|
||||
Option(AkkaServlet.this.getInitParameter("cometSupport")) filter testClassExists map newCometSupport
|
||||
|
||||
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] =
|
||||
desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault))
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,208 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package sample.rest.scala
|
||||
|
||||
import akka.actor.{SupervisorFactory, Actor}
|
||||
import akka.actor.Actor._
|
||||
import akka.stm._
|
||||
import akka.stm.TransactionalMap
|
||||
import akka.persistence.cassandra.CassandraStorage
|
||||
import akka.config.Supervision._
|
||||
import akka.util.Logging
|
||||
import scala.xml.NodeSeq
|
||||
import java.lang.Integer
|
||||
import java.nio.ByteBuffer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
|
||||
import akka.actor.ActorRegistry.actorFor
|
||||
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
|
||||
import org.atmosphere.util.XSSHtmlFilter
|
||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
import org.atmosphere.jersey.Broadcastable
|
||||
|
||||
class Boot {
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(
|
||||
actorOf[SimpleServiceActor],
|
||||
Permanent) ::
|
||||
Supervise(
|
||||
actorOf[ChatActor],
|
||||
Permanent) ::
|
||||
Supervise(
|
||||
actorOf[PersistentSimpleServiceActor],
|
||||
Permanent)
|
||||
:: Nil))
|
||||
factory.newInstance.start
|
||||
}
|
||||
|
||||
/**
|
||||
* Try service out by invoking (multiple times):
|
||||
* <pre>
|
||||
* curl http://localhost:9998/scalacount
|
||||
* </pre>
|
||||
* Or browse to the URL from a web browser.
|
||||
*/
|
||||
@Path("/scalacount")
|
||||
class SimpleService {
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def count = {
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorFor[SimpleServiceActor]
|
||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
}
|
||||
}
|
||||
|
||||
class SimpleServiceActor extends Actor {
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = TransactionalMap[String, Integer]()
|
||||
|
||||
def receive = {
|
||||
case "Tick" => if (hasStartedTicking) {
|
||||
val count = atomic {
|
||||
val current = storage.get(KEY).get.asInstanceOf[Integer].intValue
|
||||
val updated = current + 1
|
||||
storage.put(KEY, new Integer(updated))
|
||||
updated
|
||||
}
|
||||
self.reply(<success>Tick:{count}</success>)
|
||||
} else {
|
||||
atomic {
|
||||
storage.put(KEY, new Integer(0))
|
||||
}
|
||||
hasStartedTicking = true
|
||||
self.reply(<success>Tick: 0</success>)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Path("/pubsub/")
|
||||
class PubSub {
|
||||
@GET
|
||||
@Suspend
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
@Path("/topic/{topic}/")
|
||||
def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic)
|
||||
|
||||
@GET
|
||||
@Broadcast
|
||||
@Path("/topic/{topic}/{message}/")
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
|
||||
}
|
||||
|
||||
/**
|
||||
* Try service out by invoking (multiple times):
|
||||
* <pre>
|
||||
* curl http://localhost:9998/persistentscalacount
|
||||
* </pre>
|
||||
* Or browse to the URL from a web browser.
|
||||
*/
|
||||
@Path("/persistentscalacount")
|
||||
class PersistentSimpleService {
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def count = {
|
||||
//Fetch the first actor of type PersistentSimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorFor[PersistentSimpleServiceActor]
|
||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
}
|
||||
}
|
||||
|
||||
class PersistentSimpleServiceActor extends Actor {
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private lazy val storage = CassandraStorage.newMap
|
||||
|
||||
def receive = {
|
||||
case "Tick" => if (hasStartedTicking) {
|
||||
val count = atomic {
|
||||
val bytes = storage.get(KEY.getBytes).get
|
||||
val current = Integer.parseInt(new String(bytes, "UTF8"))
|
||||
val updated = current + 1
|
||||
storage.put(KEY.getBytes, (updated).toString.getBytes)
|
||||
updated
|
||||
}
|
||||
// val bytes = storage.get(KEY.getBytes).get
|
||||
// val counter = ByteBuffer.wrap(bytes).getInt
|
||||
// storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
|
||||
self.reply(<success>Tick:{count}</success>)
|
||||
} else {
|
||||
atomic {
|
||||
storage.put(KEY.getBytes, "0".getBytes)
|
||||
}
|
||||
// storage.put(KEY.getBytes, Array(0.toByte))
|
||||
hasStartedTicking = true
|
||||
self.reply(<success>Tick: 0</success>)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Path("/chat")
|
||||
class Chat {
|
||||
import ChatActor.ChatMsg
|
||||
@Suspend
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def suspend = ()
|
||||
|
||||
@POST
|
||||
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
|
||||
@Consumes(Array("application/x-www-form-urlencoded"))
|
||||
@Produces(Array("text/html"))
|
||||
def publishMessage(form: MultivaluedMap[String, String]) = {
|
||||
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
|
||||
//Fetch the first actor of type ChatActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorFor[ChatActor]
|
||||
r <- (a !! msg).as[String]} yield r
|
||||
//Return either the resulting String or a default one
|
||||
result getOrElse "System__error"
|
||||
}
|
||||
}
|
||||
|
||||
object ChatActor {
|
||||
case class ChatMsg(val who: String, val what: String, val msg: String)
|
||||
}
|
||||
|
||||
class ChatActor extends Actor with Logging {
|
||||
import ChatActor.ChatMsg
|
||||
def receive = {
|
||||
case ChatMsg(who, what, msg) => {
|
||||
what match {
|
||||
case "login" => self.reply("System Message__" + who + " has joined.")
|
||||
case "post" => self.reply("" + who + "__" + msg)
|
||||
case _ => throw new WebApplicationException(422)
|
||||
}
|
||||
}
|
||||
case x => log.info("recieve unknown: " + x)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class JsonpFilter extends BroadcastFilter with Logging {
|
||||
def filter(an: AnyRef) = {
|
||||
val m = an.toString
|
||||
var name = m
|
||||
var message = ""
|
||||
|
||||
if (m.indexOf("__") > 0) {
|
||||
name = m.substring(0, m.indexOf("__"))
|
||||
message = m.substring(m.indexOf("__") + 2)
|
||||
}
|
||||
|
||||
new BroadcastFilter.BroadcastAction("<script type='text/javascript'>\n (window.app || window.parent.app).update({ name: \"" +
|
||||
name + "\", message: \"" + message + "\" }); \n</script>\n")
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue