Mist now integrated in master
This commit is contained in:
commit
cd416a833b
18 changed files with 2039 additions and 372 deletions
|
|
@ -10,7 +10,8 @@ import java.io.File
|
|||
|
||||
import akka.actor.BootableActorLoaderService
|
||||
import akka.util.{Bootable, Logging}
|
||||
import akka.comet.AkkaServlet
|
||||
|
||||
//import akka.comet.AkkaServlet
|
||||
|
||||
import org.eclipse.jetty.xml.XmlConfiguration
|
||||
import org.eclipse.jetty.server.{Handler, Server}
|
||||
116
akka-http/src/main/scala/JettyContinuation.scala
Normal file
116
akka-http/src/main/scala/JettyContinuation.scala
Normal file
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
import org.eclipse.jetty.server._
|
||||
import org.eclipse.jetty.continuation._
|
||||
import Types._
|
||||
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait JettyContinuation extends ContinuationListener with akka.util.Logging
|
||||
{
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
import MistSettings._
|
||||
|
||||
val builder:() => tAsyncRequestContext
|
||||
val context: Option[tAsyncRequestContext] = Some(builder())
|
||||
def go = _continuation.isDefined
|
||||
|
||||
protected val _continuation:Option[AsyncContinuation] = {
|
||||
|
||||
val continuation = context.get.asInstanceOf[AsyncContinuation]
|
||||
|
||||
(continuation.isInitial,
|
||||
continuation.isSuspended,
|
||||
continuation.isExpired) match {
|
||||
|
||||
//
|
||||
// the fresh continuation (coming through getAsyncContinuation)
|
||||
//
|
||||
case (true, false, false) => {
|
||||
continuation.setTimeout(DefaultTimeout)
|
||||
|
||||
continuation.addContinuationListener(this)
|
||||
continuation.suspend
|
||||
|
||||
Some(continuation)
|
||||
}
|
||||
//
|
||||
// the fresh continuation (coming through startAsync instead)
|
||||
//
|
||||
case (true, true, false) => {
|
||||
|
||||
continuation.setTimeout(DefaultTimeout)
|
||||
continuation.addContinuationListener(this)
|
||||
|
||||
Some(continuation)
|
||||
}
|
||||
// the timeout was reset and the continuation was resumed
|
||||
// need to update the timeout and resuspend
|
||||
// very important to clear the context so the request is not rebroadcast to the endpoint
|
||||
case (false, false, false) => {
|
||||
|
||||
continuation.setTimeout(continuation.getAttribute(TimeoutAttribute).asInstanceOf[Long])
|
||||
continuation.suspend
|
||||
continuation.removeAttribute(TimeoutAttribute)
|
||||
|
||||
None
|
||||
}
|
||||
//
|
||||
// we don't actually expect to get this one here since the listener will finish him off
|
||||
//
|
||||
case (_, _, true) => {
|
||||
|
||||
None
|
||||
}
|
||||
//
|
||||
// snuh?
|
||||
//
|
||||
case _ => {
|
||||
continuation.cancel
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def suspended:Boolean = _continuation match {
|
||||
case None => false
|
||||
case Some(continuation) => (continuation.isSuspended || (continuation.getAttribute(TimeoutAttribute) ne null))
|
||||
}
|
||||
|
||||
def timeout(ms:Long):Boolean = _continuation match {
|
||||
case None => false
|
||||
case Some(continuation) =>
|
||||
continuation.setAttribute(TimeoutAttribute, ms)
|
||||
continuation.resume
|
||||
true
|
||||
}
|
||||
|
||||
//
|
||||
// ContinuationListener
|
||||
//
|
||||
def onComplete(c: Continuation) = {}
|
||||
def onTimeout(c: Continuation) = {
|
||||
c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue)
|
||||
c.complete
|
||||
}
|
||||
}
|
||||
|
||||
object JettyContinuationMethodFactory extends RequestMethodFactory {
|
||||
def Delete(f: () => tAsyncRequestContext): RequestMethod = new Delete(f) with JettyContinuation
|
||||
def Get(f: () => tAsyncRequestContext): RequestMethod = new Get(f) with JettyContinuation
|
||||
def Head(f: () => tAsyncRequestContext): RequestMethod = new Head(f) with JettyContinuation
|
||||
def Options(f: () => tAsyncRequestContext): RequestMethod = new Options(f) with JettyContinuation
|
||||
def Post(f: () => tAsyncRequestContext): RequestMethod = new Post(f) with JettyContinuation
|
||||
def Put(f: () => tAsyncRequestContext): RequestMethod = new Put(f) with JettyContinuation
|
||||
def Trace(f: () => tAsyncRequestContext): RequestMethod = new Trace(f) with JettyContinuation
|
||||
}
|
||||
|
||||
|
||||
466
akka-http/src/main/scala/Mist.scala
Normal file
466
akka-http/src/main/scala/Mist.scala
Normal file
|
|
@ -0,0 +1,466 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
import akka.util.Logging
|
||||
import javax.servlet.http.HttpServlet
|
||||
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
|
||||
import akka.actor.{ActorRegistry, ActorRef, Actor}
|
||||
import javax.servlet.Filter
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
object MistSettings {
|
||||
import akka.config.Config._
|
||||
|
||||
final val JettyServer = "jetty"
|
||||
final val TimeoutAttribute = "timeout"
|
||||
|
||||
val ConnectionClose = config.getBool("akka.http.connection-close", true)
|
||||
val RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true)
|
||||
val RootActorID = config.getString("akka.http.root-actor-id", "_httproot")
|
||||
val DefaultTimeout = config.getLong("akka.http.timeout", 1000)
|
||||
val ExpiredHeaderName = config.getString("akka.http.expired-header-name", "Async-Timeout")
|
||||
val ExpiredHeaderValue = config.getString("akka.http.expired-header-value", "expired")
|
||||
}
|
||||
|
||||
/**
|
||||
* Structural type alias's required to work with both Servlet 3.0 and Jetty's Continuation API
|
||||
*
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
object Types {
|
||||
import javax.servlet. {ServletRequest, ServletResponse}
|
||||
|
||||
/**
|
||||
* Represents an asynchronous request
|
||||
*/
|
||||
type tAsyncRequest = {
|
||||
def startAsync: tAsyncRequestContext
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to match both AsyncContext and AsyncContinuation in order to complete the request
|
||||
*/
|
||||
type tAsyncRequestContext = {
|
||||
def complete: Unit
|
||||
def getRequest: ServletRequest
|
||||
def getResponse: ServletResponse
|
||||
}
|
||||
|
||||
type Header = Tuple2[String,String]
|
||||
type Headers = List[Header]
|
||||
|
||||
def Headers(): Headers = Nil
|
||||
}
|
||||
|
||||
import Types._
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
trait Mist extends Logging {
|
||||
import javax.servlet.{ServletContext}
|
||||
import MistSettings._
|
||||
|
||||
/**
|
||||
* The root endpoint actor
|
||||
*/
|
||||
protected val _root = ActorRegistry.actorsFor(RootActorID).head
|
||||
|
||||
/**
|
||||
* Server-specific method factory
|
||||
*/
|
||||
protected var _factory: Option[RequestMethodFactory] = None
|
||||
|
||||
/**
|
||||
* Handles all servlet requests
|
||||
*/
|
||||
protected def mistify(request: HttpServletRequest,
|
||||
response: HttpServletResponse)
|
||||
(builder: (() => tAsyncRequestContext) => RequestMethod) = {
|
||||
def suspend: tAsyncRequestContext = {
|
||||
//
|
||||
// set to right now, which is effectively "already expired"
|
||||
//
|
||||
response.setDateHeader("Expires", System.currentTimeMillis)
|
||||
response.setHeader("Cache-Control", "no-cache, must-revalidate")
|
||||
|
||||
//
|
||||
// no keep-alive?
|
||||
//
|
||||
if (ConnectionClose) response.setHeader("Connection","close")
|
||||
|
||||
//
|
||||
// suspend the request
|
||||
// TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs
|
||||
//
|
||||
request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext]
|
||||
}
|
||||
|
||||
//
|
||||
// shoot the message to the root endpoint for processing
|
||||
// IMPORTANT: the suspend method is invoked on the server thread not in the actor
|
||||
//
|
||||
val method = builder(suspend _)
|
||||
if (method.go) _root ! method
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up what mist needs to be able to service requests
|
||||
* must be called prior to dispatching to "mistify"
|
||||
*/
|
||||
def initMist(context: ServletContext) {
|
||||
val server = context.getServerInfo
|
||||
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
|
||||
|
||||
log.info("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor)
|
||||
|
||||
_factory = if (major >= 3) {
|
||||
log.info("Supporting Java asynchronous contexts.")
|
||||
Some(Servlet30ContextMethodFactory)
|
||||
} else if (server.toLowerCase startsWith JettyServer) {
|
||||
log.info("Supporting Jetty asynchronous continuations.")
|
||||
Some(JettyContinuationMethodFactory)
|
||||
} else {
|
||||
log.error("No asynchronous request handling can be supported.")
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* AkkaMistServlet adds support to bridge Http and Actors in an asynchronous fashion
|
||||
* Async impls currently supported: Servlet3.0, Jetty Continuations
|
||||
*/
|
||||
class AkkaMistServlet extends HttpServlet with Mist {
|
||||
import javax.servlet.{ServletConfig}
|
||||
|
||||
/**
|
||||
* Initializes Mist
|
||||
*/
|
||||
override def init(config: ServletConfig) {
|
||||
super.init(config)
|
||||
initMist(config.getServletContext)
|
||||
}
|
||||
|
||||
protected override def doDelete(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Delete)
|
||||
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Get)
|
||||
protected override def doHead(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Head)
|
||||
protected override def doOptions(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Options)
|
||||
protected override def doPost(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Post)
|
||||
protected override def doPut(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Put)
|
||||
protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Trace)
|
||||
}
|
||||
|
||||
/**
|
||||
* Proof-of-concept, use at own risk
|
||||
* Will be officially supported in a later release
|
||||
*/
|
||||
class AkkaMistFilter extends Filter with Mist {
|
||||
import javax.servlet.{ServletRequest, ServletResponse, FilterConfig, FilterChain}
|
||||
|
||||
/**
|
||||
* Initializes Mist
|
||||
*/
|
||||
def init(config: FilterConfig) {
|
||||
initMist(config.getServletContext)
|
||||
}
|
||||
|
||||
/**
|
||||
* Decide how/if to handle the request
|
||||
*/
|
||||
override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain) {
|
||||
(req,res) match {
|
||||
case (hreq: HttpServletRequest, hres: HttpServletResponse) =>
|
||||
hreq.getMethod.toUpperCase match {
|
||||
case "DELETE" => mistify(hreq, hres)(_factory.get.Delete)
|
||||
case "GET" => mistify(hreq, hres)(_factory.get.Get)
|
||||
case "HEAD" => mistify(hreq, hres)(_factory.get.Head)
|
||||
case "OPTIONS" => mistify(hreq, hres)(_factory.get.Options)
|
||||
case "POST" => mistify(hreq, hres)(_factory.get.Post)
|
||||
case "PUT" => mistify(hreq, hres)(_factory.get.Put)
|
||||
case "TRACE" => mistify(hreq, hres)(_factory.get.Trace)
|
||||
case unknown => log.warn("Unknown http method: %s",unknown)
|
||||
}
|
||||
chain.doFilter(req,res)
|
||||
case _ => chain.doFilter(req,res)
|
||||
}
|
||||
}
|
||||
|
||||
override def destroy {}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////
|
||||
// Endpoints
|
||||
///////////////////////////////////////////
|
||||
|
||||
object Endpoint {
|
||||
import akka.dispatch.Dispatchers
|
||||
|
||||
/**
|
||||
* leverage the akka config to tweak the dispatcher for our endpoints
|
||||
*/
|
||||
final val Dispatcher = Dispatchers.fromConfig("akka.http.comet-dispatcher")
|
||||
|
||||
type Hook = Function[String, Boolean]
|
||||
type Provider = Function[String, ActorRef]
|
||||
|
||||
case class Attach(hook: Hook, provider: Provider)
|
||||
case class NoneAvailable(uri: String, req: RequestMethod)
|
||||
}
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait Endpoint { this: Actor =>
|
||||
|
||||
import Endpoint._
|
||||
|
||||
/**
|
||||
* A convenience method to get the actor ref
|
||||
*/
|
||||
def actor: ActorRef = this.self
|
||||
|
||||
/**
|
||||
* The list of connected endpoints to which this one should/could forward the request.
|
||||
* If the hook func returns true, the message will be sent to the actor returned from provider.
|
||||
*/
|
||||
protected var _attachments = List[Tuple2[Hook, Provider]]()
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
protected def _attach(hook:Hook, provider:Provider) = _attachments = (hook, provider) :: _attachments
|
||||
|
||||
/**
|
||||
* Message handling common to all endpoints, must be chained
|
||||
*/
|
||||
protected def handleHttpRequest: Receive = {
|
||||
//
|
||||
// add the endpoint - the if the uri hook matches,
|
||||
// the message will be sent to the actor returned by the provider func
|
||||
//
|
||||
case Attach(hook, provider) => _attach(hook, provider)
|
||||
|
||||
//
|
||||
// dispatch the suspended requests
|
||||
//
|
||||
case req: RequestMethod => {
|
||||
val uri = req.request.getRequestURI
|
||||
val endpoints = _attachments.filter { _._1(uri) }
|
||||
|
||||
if (!endpoints.isEmpty)
|
||||
endpoints.foreach { _._2(uri) ! req }
|
||||
else {
|
||||
self.sender match {
|
||||
case Some(s) => s reply NoneAvailable(uri, req)
|
||||
case None => _na(uri, req)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* no endpoint available - completes the request with a 404
|
||||
*/
|
||||
protected def _na(uri: String, req: RequestMethod) = {
|
||||
req.NotFound("No endpoint available for [" + uri + "]")
|
||||
log.debug("No endpoint available for [" + uri + "]")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class RootEndpoint extends Actor with Endpoint {
|
||||
import Endpoint._
|
||||
import MistSettings._
|
||||
|
||||
final val Root = "/"
|
||||
|
||||
//
|
||||
// use the configurable dispatcher
|
||||
//
|
||||
self.dispatcher = Endpoint.Dispatcher
|
||||
|
||||
//
|
||||
// adopt the configured id
|
||||
//
|
||||
if (RootActorBuiltin) self.id = RootActorID
|
||||
|
||||
override def preStart =
|
||||
_attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
|
||||
|
||||
//TODO: Is this needed?
|
||||
//override def postRestart =
|
||||
// _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
|
||||
|
||||
def recv: Receive = {
|
||||
case NoneAvailable(uri, req) => _na(uri, req)
|
||||
case unknown => log.error("Unexpected message sent to root endpoint. [" + unknown + "]")
|
||||
}
|
||||
|
||||
/**
|
||||
* Note that root is a little different, other endpoints should chain their own recv first
|
||||
*/
|
||||
def receive = handleHttpRequest orElse recv
|
||||
}
|
||||
|
||||
///////////////////////////////////////////
|
||||
// RequestMethods
|
||||
///////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Basic description of the suspended async http request.
|
||||
* Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations)
|
||||
*
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait RequestMethod extends Logging
|
||||
{
|
||||
import java.io.IOException
|
||||
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
|
||||
|
||||
//
|
||||
// required implementations
|
||||
//
|
||||
|
||||
val builder: () => tAsyncRequestContext
|
||||
|
||||
/**
|
||||
* Provides a general type for the underlying context
|
||||
*
|
||||
* @return a completable request context
|
||||
*/
|
||||
val context: Option[tAsyncRequestContext]
|
||||
def go:Boolean
|
||||
|
||||
/**
|
||||
* Updates (resets) the timeout
|
||||
*
|
||||
* @return true if updated, false if not supported
|
||||
*/
|
||||
def timeout(ms: Long): Boolean
|
||||
|
||||
/**
|
||||
* Status of the suspension
|
||||
*/
|
||||
def suspended: Boolean
|
||||
|
||||
//
|
||||
// convenience funcs
|
||||
//
|
||||
|
||||
def request = context.get.getRequest.asInstanceOf[HttpServletRequest]
|
||||
def response = context.get.getResponse.asInstanceOf[HttpServletResponse]
|
||||
|
||||
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
|
||||
request.getHeader(name) match {
|
||||
case null => default(null)
|
||||
case s => s
|
||||
}
|
||||
|
||||
def getParameterOrElse(name: String, default: Function[Any, String]): String =
|
||||
request.getParameter(name) match {
|
||||
case null => default(null)
|
||||
case s => s
|
||||
}
|
||||
|
||||
|
||||
def complete(status: Int, body: String): Boolean = complete(status, body, Headers())
|
||||
|
||||
def complete(status: Int, body: String, headers: Headers): Boolean =
|
||||
context match {
|
||||
case Some(pipe) => {
|
||||
try {
|
||||
if (!suspended) {
|
||||
log.warning("Attempt to complete an expired connection.")
|
||||
false
|
||||
}
|
||||
else {
|
||||
response.setStatus(status)
|
||||
headers foreach {h => response.setHeader(h._1, h._2)}
|
||||
response.getWriter.write(body)
|
||||
response.getWriter.close
|
||||
response.flushBuffer
|
||||
pipe.complete
|
||||
true
|
||||
}
|
||||
} catch {
|
||||
case io =>
|
||||
log.error(io, "Failed to write data to connection on resume - the client probably disconnected")
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
case None =>
|
||||
log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")")
|
||||
false
|
||||
}
|
||||
|
||||
def complete(t: Throwable) {
|
||||
context match {
|
||||
case Some(pipe) => {
|
||||
try {
|
||||
if (!suspended) {
|
||||
log.warning("Attempt to complete an expired connection.")
|
||||
}
|
||||
else {
|
||||
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume")
|
||||
pipe.complete
|
||||
}
|
||||
}
|
||||
catch {
|
||||
case io: IOException => log.error(io, "Request completed with internal error.")
|
||||
}
|
||||
finally {
|
||||
log.error(t, "Request completed with internal error.")
|
||||
}
|
||||
}
|
||||
|
||||
case None =>
|
||||
log.error(t, "Attempt to complete request with no context")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility methods to send responses back
|
||||
*/
|
||||
|
||||
def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body)
|
||||
def OK(body: String, headers:Headers): Boolean = complete(HttpServletResponse.SC_OK, body, headers)
|
||||
def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body)
|
||||
def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body)
|
||||
def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body)
|
||||
def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body)
|
||||
def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body)
|
||||
def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body)
|
||||
def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body)
|
||||
def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body)
|
||||
def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body)
|
||||
def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body)
|
||||
def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body)
|
||||
def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body)
|
||||
def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body)
|
||||
def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString)))
|
||||
}
|
||||
|
||||
abstract class Delete(val builder: () => tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Get(val builder: () => tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Head(val builder: () => tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Options(val builder: () => tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Post(val builder: () => tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Put(val builder: () => tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Trace(val builder: () => tAsyncRequestContext) extends RequestMethod
|
||||
|
||||
trait RequestMethodFactory {
|
||||
def Delete(f: () => tAsyncRequestContext): RequestMethod
|
||||
def Get(f: () => tAsyncRequestContext): RequestMethod
|
||||
def Head(f: () => tAsyncRequestContext): RequestMethod
|
||||
def Options(f: () => tAsyncRequestContext): RequestMethod
|
||||
def Post(f: () => tAsyncRequestContext): RequestMethod
|
||||
def Put(f: () => tAsyncRequestContext): RequestMethod
|
||||
def Trace(f: () => tAsyncRequestContext): RequestMethod
|
||||
}
|
||||
69
akka-http/src/main/scala/Servlet30Context.scala
Normal file
69
akka-http/src/main/scala/Servlet30Context.scala
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
import javax.servlet. {AsyncContext, AsyncListener, AsyncEvent};
|
||||
import Types._
|
||||
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait Servlet30Context extends AsyncListener with akka.util.Logging
|
||||
{
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
import MistSettings._
|
||||
|
||||
val builder: () => tAsyncRequestContext
|
||||
val context: Option[tAsyncRequestContext] = Some(builder())
|
||||
def go = context.isDefined
|
||||
|
||||
protected val _ac: AsyncContext = {
|
||||
val ac = context.get.asInstanceOf[AsyncContext]
|
||||
ac setTimeout DefaultTimeout
|
||||
ac addListener this
|
||||
ac
|
||||
}
|
||||
|
||||
def suspended = true
|
||||
|
||||
def timeout(ms: Long): Boolean = {
|
||||
try {
|
||||
_ac setTimeout ms
|
||||
true
|
||||
}
|
||||
catch {
|
||||
case ex: IllegalStateException =>
|
||||
log.info("Cannot update timeout - already returned to container")
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// AsyncListener
|
||||
//
|
||||
def onComplete(e: AsyncEvent) {}
|
||||
def onError(e: AsyncEvent) = e.getThrowable match {
|
||||
case null => log.warning("Error occured...")
|
||||
case t => log.warning(t, "Error occured")
|
||||
}
|
||||
def onStartAsync(e: AsyncEvent) {}
|
||||
def onTimeout(e: AsyncEvent) = {
|
||||
e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue)
|
||||
e.getAsyncContext.complete
|
||||
}
|
||||
}
|
||||
|
||||
object Servlet30ContextMethodFactory extends RequestMethodFactory {
|
||||
def Delete(f: () => tAsyncRequestContext): RequestMethod = new Delete(f) with Servlet30Context
|
||||
def Get(f: () => tAsyncRequestContext): RequestMethod = new Get(f) with Servlet30Context
|
||||
def Head(f: () => tAsyncRequestContext): RequestMethod = new Head(f) with Servlet30Context
|
||||
def Options(f: () => tAsyncRequestContext): RequestMethod = new Options(f) with Servlet30Context
|
||||
def Post(f: () => tAsyncRequestContext): RequestMethod = new Post(f) with Servlet30Context
|
||||
def Put(f: () => tAsyncRequestContext): RequestMethod = new Put(f) with Servlet30Context
|
||||
def Trace(f: () => tAsyncRequestContext): RequestMethod = new Trace(f) with Servlet30Context
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -1,40 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.comet
|
||||
|
||||
import org.atmosphere.cpr.{AtmosphereResourceEvent, AtmosphereResource}
|
||||
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.Actor
|
||||
import akka.dispatch.Dispatchers
|
||||
import org.atmosphere.jersey.util.JerseyBroadcasterUtil
|
||||
|
||||
object AkkaBroadcaster {
|
||||
val broadcasterDispatcher = Dispatchers.fromConfig("akka.http.comet-dispatcher")
|
||||
|
||||
type Event = AtmosphereResourceEvent[_,_]
|
||||
type Resource = AtmosphereResource[_,_]
|
||||
}
|
||||
|
||||
class AkkaBroadcaster extends org.atmosphere.jersey.util.JerseySimpleBroadcaster {
|
||||
import AkkaBroadcaster._
|
||||
|
||||
//FIXME should be supervised
|
||||
lazy val caster = actorOf(new Actor {
|
||||
self.dispatcher = broadcasterDispatcher
|
||||
def receive = {
|
||||
case (r: Resource,e: Event) => JerseyBroadcasterUtil.broadcast(r,e)
|
||||
}
|
||||
}).start
|
||||
|
||||
override def destroy {
|
||||
super.destroy
|
||||
caster.stop
|
||||
}
|
||||
|
||||
protected override def broadcast(r: Resource, e : Event) {
|
||||
caster ! ((r,e))
|
||||
}
|
||||
}
|
||||
|
|
@ -1,101 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.comet
|
||||
|
||||
import akka.util.Logging
|
||||
|
||||
import java.util.{List => JList}
|
||||
import javax.servlet.{ServletConfig,ServletContext}
|
||||
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
|
||||
import com.sun.jersey.spi.container.servlet.ServletContainer
|
||||
|
||||
import org.atmosphere.container.GrizzlyCometSupport
|
||||
import org.atmosphere.cpr.{AtmosphereServlet, AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent,CometSupport,CometSupportResolver,DefaultCometSupportResolver}
|
||||
import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
|
||||
|
||||
class AtmosphereRestServlet extends ServletContainer with AtmosphereServletProcessor {
|
||||
//Delegate to implement the behavior for AtmosphereHandler
|
||||
private val handler = new AbstractReflectorAtmosphereHandler {
|
||||
override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
|
||||
if (event ne null) {
|
||||
event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_RESOURCE, event)
|
||||
event.getRequest.setAttribute(AtmosphereServlet.ATMOSPHERE_HANDLER, this)
|
||||
service(event.getRequest, event.getResponse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onStateChange(event: AtmosphereResourceEvent[HttpServletRequest, HttpServletResponse]) {
|
||||
if (event ne null) handler onStateChange event
|
||||
}
|
||||
|
||||
override def onRequest(resource: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
|
||||
handler onRequest resource
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Akka's Comet servlet to be used when deploying actors exposed as Comet (and REST) services in a
|
||||
* standard servlet container, e.g. not using the Akka Kernel.
|
||||
* <p/>
|
||||
* Used by the Akka Kernel to bootstrap REST and Comet.
|
||||
*/
|
||||
class AkkaServlet extends AtmosphereServlet {
|
||||
import akka.config.Config.{config => c}
|
||||
|
||||
/*
|
||||
* Configure Atmosphere and Jersey (default, fall-back values)
|
||||
*/
|
||||
addInitParameter(AtmosphereServlet.DISABLE_ONSTATE_EVENT,"true")
|
||||
addInitParameter(AtmosphereServlet.BROADCASTER_CLASS,classOf[AkkaBroadcaster].getName)
|
||||
addInitParameter(AtmosphereServlet.PROPERTY_USE_STREAM,"true")
|
||||
addInitParameter("com.sun.jersey.config.property.packages",c.getList("akka.http.resource_packages").mkString(";"))
|
||||
addInitParameter("com.sun.jersey.spi.container.ResourceFilters",c.getList("akka.http.filters").mkString(","))
|
||||
|
||||
c.getInt("akka.http.maxInactiveActivity") foreach { value => addInitParameter(CometSupport.MAX_INACTIVE,value.toString) }
|
||||
c.getString("akka.http.cometSupport") foreach { value => addInitParameter("cometSupport",value) }
|
||||
|
||||
/*
|
||||
* Provide a fallback for default values
|
||||
*/
|
||||
override def getInitParameter(key : String) =
|
||||
Option(super.getInitParameter(key)).getOrElse(initParams get key)
|
||||
|
||||
/*
|
||||
* Provide a fallback for default values
|
||||
*/
|
||||
override def getInitParameterNames() = {
|
||||
import scala.collection.JavaConversions._
|
||||
initParams.keySet.iterator ++ super.getInitParameterNames
|
||||
}
|
||||
|
||||
/**
|
||||
* We override this to avoid Atmosphere looking for it's atmosphere.xml file
|
||||
* Instead we specify what semantics we want in code.
|
||||
*/
|
||||
override def loadConfiguration(sc: ServletConfig) {
|
||||
config.setSupportSession(false)
|
||||
isBroadcasterSpecified = true
|
||||
|
||||
//The bridge between Atmosphere and Jersey
|
||||
val servlet = new AtmosphereRestServlet {
|
||||
//These are needed to make sure that Jersey is reading the config from the outer servlet
|
||||
override def getInitParameter(key : String) = AkkaServlet.this.getInitParameter(key)
|
||||
override def getInitParameterNames() = AkkaServlet.this.getInitParameterNames()
|
||||
}
|
||||
|
||||
addAtmosphereHandler("/*", servlet, new AkkaBroadcaster)
|
||||
}
|
||||
|
||||
override lazy val createCometSupportResolver: CometSupportResolver = new DefaultCometSupportResolver(config) {
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
lazy val desiredCometSupport =
|
||||
Option(AkkaServlet.this.getInitParameter("cometSupport")) filter testClassExists map newCometSupport
|
||||
|
||||
override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] =
|
||||
desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault))
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package sample.rest.scala
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.config.Supervision._
|
||||
import akka.http._
|
||||
|
||||
|
||||
/**
|
||||
* Starts up the base services for http (jetty)
|
||||
*/
|
||||
class Boot {
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
//
|
||||
// in this particular case, just boot the built-in default root endpoint
|
||||
//
|
||||
Supervise(
|
||||
actorOf[RootEndpoint],
|
||||
Permanent) ::
|
||||
Supervise(
|
||||
actorOf[SimpleAkkaAsyncHttpService],
|
||||
Permanent)
|
||||
:: Nil))
|
||||
factory.newInstance.start
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,187 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package sample.mist
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.http._
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Define a top level service endpoint
|
||||
*
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
class InterestingService extends Actor with Endpoint {
|
||||
//
|
||||
// use the configurable dispatcher
|
||||
//
|
||||
self.dispatcher = Endpoint.Dispatcher
|
||||
|
||||
final val ServiceRoot = "/interesting/"
|
||||
final val Multi = ServiceRoot + "multi/"
|
||||
|
||||
//
|
||||
// The "multi" endpoint shows forking off multiple actions per request
|
||||
// It is triggered by POSTing to http://localhost:9998/interesting/multi/{foo}
|
||||
// Try with/without a header named "Test-Token"
|
||||
// Try with/without a form parameter named "Data"
|
||||
//
|
||||
def hookMultiActionA(uri: String): Boolean = uri startsWith Multi
|
||||
def provideMultiActionA(uri: String): ActorRef = actorOf(new ActionAActor(complete)).start
|
||||
|
||||
def hookMultiActionB(uri: String): Boolean = uri startsWith Multi
|
||||
def provideMultiActionB(uri: String): ActorRef = actorOf(new ActionBActor(complete)).start
|
||||
|
||||
//
|
||||
// this is where you want attach your endpoint hooks
|
||||
//
|
||||
override def preStart {
|
||||
//
|
||||
// we expect there to be one root and that it's already been started up
|
||||
// obviously there are plenty of other ways to obtaining this actor
|
||||
// the point is that we need to attach something (for starters anyway)
|
||||
// to the root
|
||||
//
|
||||
val root = ActorRegistry.actorsFor(classOf[RootEndpoint]).head
|
||||
root ! Endpoint.Attach(hookMultiActionA, provideMultiActionA)
|
||||
root ! Endpoint.Attach(hookMultiActionB, provideMultiActionB)
|
||||
}
|
||||
|
||||
//
|
||||
// since this actor isn't doing anything else (i.e. not handling other messages)
|
||||
// just assign the receive func like so...
|
||||
// otherwise you could do something like:
|
||||
// def myrecv = {...}
|
||||
// def receive = myrecv orElse handleHttpRequest
|
||||
//
|
||||
def receive = handleHttpRequest
|
||||
|
||||
|
||||
//
|
||||
// this guy completes requests after other actions have occured
|
||||
//
|
||||
lazy val complete = actorOf[ActionCompleteActor].start
|
||||
}
|
||||
|
||||
class ActionAActor(complete: ActorRef) extends Actor {
|
||||
import javax.ws.rs.core.MediaType
|
||||
|
||||
def receive = {
|
||||
//
|
||||
// handle a post request
|
||||
//
|
||||
case post:Post => {
|
||||
//
|
||||
// the expected content type of the request
|
||||
// similar to @Consumes
|
||||
//
|
||||
if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
|
||||
//
|
||||
// the content type of the response.
|
||||
// similar to @Produces annotation
|
||||
//
|
||||
post.response.setContentType(MediaType.TEXT_HTML)
|
||||
|
||||
//
|
||||
// get the resource name
|
||||
//
|
||||
val name = post.request.getRequestURI.substring("/interesting/multi/".length)
|
||||
val response = if (name.length % 2 == 0)
|
||||
"<p>Action A verified request.</p>"
|
||||
else
|
||||
"<p>Action A could not verify request.</p>"
|
||||
|
||||
post.response.getWriter.write(response)
|
||||
|
||||
//
|
||||
// notify the next actor to coordinate the response
|
||||
//
|
||||
complete ! post
|
||||
}
|
||||
else {
|
||||
post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ActionBActor(complete:ActorRef) extends Actor {
|
||||
import javax.ws.rs.core.MediaType
|
||||
|
||||
def receive = {
|
||||
//
|
||||
// handle a post request
|
||||
//
|
||||
case post:Post => {
|
||||
//
|
||||
// the expected content type of the request
|
||||
// similar to @Consumes
|
||||
//
|
||||
if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
|
||||
//
|
||||
// pull some headers and form params
|
||||
//
|
||||
def default(any: Any): String = ""
|
||||
val token = post.getHeaderOrElse("Test-Token", default)
|
||||
val data = post.getParameterOrElse("Data", default)
|
||||
|
||||
val (resp, status) = (token, data) match {
|
||||
case ("", _) => ("No token provided", HttpServletResponse.SC_FORBIDDEN)
|
||||
case (_, "") => ("No data", HttpServletResponse.SC_ACCEPTED)
|
||||
case _ => ("Data accepted", HttpServletResponse.SC_OK)
|
||||
}
|
||||
|
||||
//
|
||||
// update the response body
|
||||
//
|
||||
post.response.getWriter.write(resp)
|
||||
|
||||
//
|
||||
// notify the next actor to coordinate the response
|
||||
//
|
||||
complete ! (post, status)
|
||||
}
|
||||
else {
|
||||
post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
|
||||
}
|
||||
}
|
||||
|
||||
case other: RequestMethod =>
|
||||
other.NotAllowed("Invalid method for this endpoint")
|
||||
}
|
||||
}
|
||||
|
||||
class ActionCompleteActor extends Actor
|
||||
{
|
||||
import collection.mutable.HashMap
|
||||
|
||||
val requests = HashMap.empty[Int, Int]
|
||||
|
||||
def receive = {
|
||||
case req: RequestMethod =>
|
||||
if (requests contains req.hashCode)
|
||||
complete(req)
|
||||
else
|
||||
requests += (req.hashCode -> 0)
|
||||
|
||||
case t: Tuple2[RequestMethod, Int] =>
|
||||
if (requests contains t._1.hashCode)
|
||||
complete(t._1)
|
||||
else
|
||||
requests += (t._1.hashCode -> t._2)
|
||||
}
|
||||
|
||||
def complete(req:RequestMethod) = requests.remove(req.hashCode) match {
|
||||
case Some(HttpServletResponse.SC_FORBIDDEN) => req.Forbidden("")
|
||||
case Some(HttpServletResponse.SC_ACCEPTED) => req.Accepted("")
|
||||
case Some(_) => req.OK("")
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,144 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package sample.rest.scala
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import akka.http._
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Define a top level service endpoint
|
||||
* Usage: GET or POST to http://localhost:9998/simple/same or http://localhost:9998/simple/new
|
||||
*
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
class SimpleAkkaAsyncHttpService extends Actor with Endpoint {
|
||||
//
|
||||
// use the configurable dispatcher
|
||||
//
|
||||
self.dispatcher = Endpoint.Dispatcher
|
||||
|
||||
final val ServiceRoot = "/simple/"
|
||||
final val ProvideSameActor = ServiceRoot + "same"
|
||||
final val ProvideNewActor = ServiceRoot + "new"
|
||||
|
||||
//
|
||||
// there are different ways of doing this - in this case, we'll use a single hook function
|
||||
// and discriminate in the provider; alternatively we can pair hooks & providers
|
||||
//
|
||||
def hook(uri: String): Boolean = uri match {
|
||||
case ProvideSameActor | ProvideNewActor => true
|
||||
case _ => false
|
||||
}
|
||||
|
||||
def provide(uri: String): ActorRef = uri match {
|
||||
case ProvideSameActor => same
|
||||
case _ => actorOf[BoringActor].start
|
||||
}
|
||||
|
||||
//
|
||||
// this is where you want attach your endpoint hooks
|
||||
//
|
||||
override def preStart {
|
||||
//
|
||||
// we expect there to be one root and that it's already been started up
|
||||
// obviously there are plenty of other ways to obtaining this actor
|
||||
// the point is that we need to attach something (for starters anyway)
|
||||
// to the root
|
||||
//
|
||||
val root = ActorRegistry.actorsFor(classOf[RootEndpoint]).head
|
||||
root ! Endpoint.Attach(hook, provide)
|
||||
}
|
||||
|
||||
//
|
||||
// since this actor isn't doing anything else (i.e. not handling other messages)
|
||||
// just assign the receive func like so...
|
||||
// otherwise you could do something like:
|
||||
// def myrecv = {...}
|
||||
// def receive = myrecv orElse handleHttpRequest
|
||||
//
|
||||
def receive = handleHttpRequest
|
||||
|
||||
//
|
||||
// this will be our "same" actor provided with ProvideSameActor endpoint is hit
|
||||
//
|
||||
lazy val same = actorOf[BoringActor].start
|
||||
}
|
||||
|
||||
/**
|
||||
* Define a service handler to respond to some HTTP requests
|
||||
*/
|
||||
class BoringActor extends Actor {
|
||||
import java.util.Date
|
||||
import javax.ws.rs.core.MediaType
|
||||
|
||||
var gets = 0
|
||||
var posts = 0
|
||||
var lastget: Option[Date] = None
|
||||
var lastpost: Option[Date] = None
|
||||
|
||||
def receive = {
|
||||
//
|
||||
// handle a get request
|
||||
//
|
||||
case get: Get => {
|
||||
//
|
||||
// the content type of the response.
|
||||
// similar to @Produces annotation
|
||||
//
|
||||
get.response.setContentType(MediaType.TEXT_HTML)
|
||||
|
||||
//
|
||||
// "work"
|
||||
//
|
||||
gets += 1
|
||||
lastget = Some(new Date)
|
||||
|
||||
//
|
||||
// respond
|
||||
//
|
||||
val res = "<p>Gets: "+gets+" Posts: "+posts+"</p><p>Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"</p>"
|
||||
get.OK(res)
|
||||
}
|
||||
|
||||
//
|
||||
// handle a post request
|
||||
//
|
||||
case post:Post => {
|
||||
//
|
||||
// the expected content type of the request
|
||||
// similar to @Consumes
|
||||
//
|
||||
if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) {
|
||||
//
|
||||
// the content type of the response.
|
||||
// similar to @Produces annotation
|
||||
//
|
||||
post.response.setContentType(MediaType.TEXT_HTML)
|
||||
|
||||
//
|
||||
// "work"
|
||||
//
|
||||
posts += 1
|
||||
lastpost = Some(new Date)
|
||||
|
||||
//
|
||||
// respond
|
||||
//
|
||||
val res = "<p>Gets: "+gets+" Posts: "+posts+"</p><p>Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"</p>"
|
||||
post.OK(res)
|
||||
}
|
||||
else {
|
||||
post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
|
||||
}
|
||||
}
|
||||
|
||||
case other: RequestMethod => other.NotAllowed("Invalid method for this endpoint")
|
||||
}
|
||||
}
|
||||
|
|
@ -1,208 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package sample.rest.scala
|
||||
|
||||
import akka.actor.{SupervisorFactory, Actor}
|
||||
import akka.actor.Actor._
|
||||
import akka.stm._
|
||||
import akka.stm.TransactionalMap
|
||||
import akka.persistence.cassandra.CassandraStorage
|
||||
import akka.config.Supervision._
|
||||
import akka.util.Logging
|
||||
import scala.xml.NodeSeq
|
||||
import java.lang.Integer
|
||||
import java.nio.ByteBuffer
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes,PathParam}
|
||||
import akka.actor.ActorRegistry.actorFor
|
||||
import org.atmosphere.annotation.{Broadcast, Suspend,Cluster}
|
||||
import org.atmosphere.util.XSSHtmlFilter
|
||||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
import org.atmosphere.jersey.Broadcastable
|
||||
|
||||
class Boot {
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
Supervise(
|
||||
actorOf[SimpleServiceActor],
|
||||
Permanent) ::
|
||||
Supervise(
|
||||
actorOf[ChatActor],
|
||||
Permanent) ::
|
||||
Supervise(
|
||||
actorOf[PersistentSimpleServiceActor],
|
||||
Permanent)
|
||||
:: Nil))
|
||||
factory.newInstance.start
|
||||
}
|
||||
|
||||
/**
|
||||
* Try service out by invoking (multiple times):
|
||||
* <pre>
|
||||
* curl http://localhost:9998/scalacount
|
||||
* </pre>
|
||||
* Or browse to the URL from a web browser.
|
||||
*/
|
||||
@Path("/scalacount")
|
||||
class SimpleService {
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def count = {
|
||||
//Fetch the first actor of type SimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorFor[SimpleServiceActor]
|
||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
}
|
||||
}
|
||||
|
||||
class SimpleServiceActor extends Actor {
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private val storage = TransactionalMap[String, Integer]()
|
||||
|
||||
def receive = {
|
||||
case "Tick" => if (hasStartedTicking) {
|
||||
val count = atomic {
|
||||
val current = storage.get(KEY).get.asInstanceOf[Integer].intValue
|
||||
val updated = current + 1
|
||||
storage.put(KEY, new Integer(updated))
|
||||
updated
|
||||
}
|
||||
self.reply(<success>Tick:{count}</success>)
|
||||
} else {
|
||||
atomic {
|
||||
storage.put(KEY, new Integer(0))
|
||||
}
|
||||
hasStartedTicking = true
|
||||
self.reply(<success>Tick: 0</success>)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Path("/pubsub/")
|
||||
class PubSub {
|
||||
@GET
|
||||
@Suspend
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
@Path("/topic/{topic}/")
|
||||
def subscribe(@PathParam("topic") topic: Broadcaster): Broadcastable = new Broadcastable("", topic)
|
||||
|
||||
@GET
|
||||
@Broadcast
|
||||
@Path("/topic/{topic}/{message}/")
|
||||
@Produces(Array("text/plain;charset=ISO-8859-1"))
|
||||
def say(@PathParam("topic") topic: Broadcaster, @PathParam("message") message: String): Broadcastable = new Broadcastable(message, topic)
|
||||
}
|
||||
|
||||
/**
|
||||
* Try service out by invoking (multiple times):
|
||||
* <pre>
|
||||
* curl http://localhost:9998/persistentscalacount
|
||||
* </pre>
|
||||
* Or browse to the URL from a web browser.
|
||||
*/
|
||||
@Path("/persistentscalacount")
|
||||
class PersistentSimpleService {
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def count = {
|
||||
//Fetch the first actor of type PersistentSimpleServiceActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorFor[PersistentSimpleServiceActor]
|
||||
r <- (a !! "Tick").as[NodeSeq]} yield r
|
||||
//Return either the resulting NodeSeq or a default one
|
||||
result getOrElse <error>Error in counter</error>
|
||||
}
|
||||
}
|
||||
|
||||
class PersistentSimpleServiceActor extends Actor {
|
||||
private val KEY = "COUNTER"
|
||||
private var hasStartedTicking = false
|
||||
private lazy val storage = CassandraStorage.newMap
|
||||
|
||||
def receive = {
|
||||
case "Tick" => if (hasStartedTicking) {
|
||||
val count = atomic {
|
||||
val bytes = storage.get(KEY.getBytes).get
|
||||
val current = Integer.parseInt(new String(bytes, "UTF8"))
|
||||
val updated = current + 1
|
||||
storage.put(KEY.getBytes, (updated).toString.getBytes)
|
||||
updated
|
||||
}
|
||||
// val bytes = storage.get(KEY.getBytes).get
|
||||
// val counter = ByteBuffer.wrap(bytes).getInt
|
||||
// storage.put(KEY.getBytes, ByteBuffer.allocate(4).putInt(counter + 1).array)
|
||||
self.reply(<success>Tick:{count}</success>)
|
||||
} else {
|
||||
atomic {
|
||||
storage.put(KEY.getBytes, "0".getBytes)
|
||||
}
|
||||
// storage.put(KEY.getBytes, Array(0.toByte))
|
||||
hasStartedTicking = true
|
||||
self.reply(<success>Tick: 0</success>)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Path("/chat")
|
||||
class Chat {
|
||||
import ChatActor.ChatMsg
|
||||
@Suspend
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def suspend = ()
|
||||
|
||||
@POST
|
||||
@Broadcast(Array(classOf[XSSHtmlFilter], classOf[JsonpFilter]))
|
||||
@Consumes(Array("application/x-www-form-urlencoded"))
|
||||
@Produces(Array("text/html"))
|
||||
def publishMessage(form: MultivaluedMap[String, String]) = {
|
||||
val msg = ChatMsg(form.getFirst("name"),form.getFirst("action"),form.getFirst("message"))
|
||||
//Fetch the first actor of type ChatActor
|
||||
//Send it the "Tick" message and expect a NodeSeq back
|
||||
val result = for{a <- actorFor[ChatActor]
|
||||
r <- (a !! msg).as[String]} yield r
|
||||
//Return either the resulting String or a default one
|
||||
result getOrElse "System__error"
|
||||
}
|
||||
}
|
||||
|
||||
object ChatActor {
|
||||
case class ChatMsg(val who: String, val what: String, val msg: String)
|
||||
}
|
||||
|
||||
class ChatActor extends Actor with Logging {
|
||||
import ChatActor.ChatMsg
|
||||
def receive = {
|
||||
case ChatMsg(who, what, msg) => {
|
||||
what match {
|
||||
case "login" => self.reply("System Message__" + who + " has joined.")
|
||||
case "post" => self.reply("" + who + "__" + msg)
|
||||
case _ => throw new WebApplicationException(422)
|
||||
}
|
||||
}
|
||||
case x => log.info("recieve unknown: " + x)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class JsonpFilter extends BroadcastFilter with Logging {
|
||||
def filter(an: AnyRef) = {
|
||||
val m = an.toString
|
||||
var name = m
|
||||
var message = ""
|
||||
|
||||
if (m.indexOf("__") > 0) {
|
||||
name = m.substring(0, m.indexOf("__"))
|
||||
message = m.substring(m.indexOf("__") + 2)
|
||||
}
|
||||
|
||||
new BroadcastFilter.BroadcastAction("<script type='text/javascript'>\n (window.app || window.parent.app).update({ name: \"" +
|
||||
name + "\", message: \"" + message + "\" }); \n</script>\n")
|
||||
}
|
||||
}
|
||||
|
|
@ -89,17 +89,18 @@ akka {
|
|||
http {
|
||||
hostname = "localhost"
|
||||
port = 9998
|
||||
#cometSupport = "org.atmosphere.container.Jetty7CometSupport" # Disregard autodetection, for valid values: http://doc.akkasource.org/comet
|
||||
filters = ["akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
|
||||
resource_packages = ["sample.rest.scala",
|
||||
"sample.rest.java",
|
||||
"sample.security"] # List with all resource packages for your Jersey services
|
||||
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (sample now)
|
||||
|
||||
comet-dispatcher {
|
||||
#type = "Hawt" # Uncomment if you want to use a different dispatcher than the default one for Comet
|
||||
}
|
||||
# maxInactiveActivity = 60000 # Atmosphere CometSupport maxInactiveActivity
|
||||
|
||||
connection-close = true # toggles the addition of the "Connection" response header with a "close" value
|
||||
root-actor-id = "_httproot" # the id of the actor to use as the root endpoint
|
||||
root-actor-builtin = true # toggles the use of the built-in root endpoint base class
|
||||
timeout = 1000 # the default timeout for all async requests (in ms)
|
||||
expired-header-name = "Async-Timeout" # the name of the response header to use when an async request expires
|
||||
expired-header-value = "expired" # the value of the response header to use when an async request expires
|
||||
|
||||
# Uncomment if you are using the KerberosAuthenticationActor
|
||||
# kerberos {
|
||||
|
|
|
|||
|
|
@ -70,10 +70,10 @@
|
|||
<Set name="handlers">
|
||||
<Array type="org.eclipse.jetty.server.Handler">
|
||||
<Item>
|
||||
<New id="AkkaRestHandler" class="org.eclipse.jetty.servlet.ServletContextHandler">
|
||||
<New id="AkkaMistHandler" class="org.eclipse.jetty.servlet.ServletContextHandler">
|
||||
<Set name="contextPath">/</Set>
|
||||
<Call name="addServlet">
|
||||
<Arg>akka.comet.AkkaServlet</Arg>
|
||||
<Arg>akka.http.AkkaMistServlet</Arg>
|
||||
<Arg>/*</Arg>
|
||||
</Call>
|
||||
</New>
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue