adding back (mist) http work in a new branch. misitfy was too stale. this is WIP - trying to support both SAPI 3.0 and Jetty Continuations at once

This commit is contained in:
Garrick Evans 2010-11-08 10:54:43 -08:00 committed by Garrick Evans
parent 7668813382
commit 9136e62cd2
6 changed files with 745 additions and 0 deletions

View file

@ -0,0 +1,109 @@
/**
* Copyright 2010 Autodesk, Inc. All rights reserved.
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
*/
package akka.http
import akka.util.Logging
import javax.servlet.http.{HttpServletResponse, HttpServlet}
/**
* @author Garrick Evans
*/
class AsyncHttpServlet extends HttpServlet with Logging
{
import java.util. {Date, TimeZone}
import java.text.SimpleDateFormat
import javax.servlet.ServletConfig
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import akka.actor.ActorRegistry
import Types._
//
// the root endpoint for this servlet will have been booted already
// use the system property to find out the actor id and cache him
// TODO: currently this is hardcoded but really use a property
//
protected val _root = ActorRegistry.actorsFor("DefaultGridRoot").head
/**
* Handles the HTTP request method on the servlet, suspends the connection and sends the asynchronous context
* along to the root endpoint in a SuspendedRequest message
*/
protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder: (()=>Option[tAsyncContext]) => SuspendedRequest) =
{
def suspend:Option[tAsyncContext] =
{
//
// set to effectively "already expired"
//
val gmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
gmt.setTimeZone(TimeZone.getTimeZone("GMT"))
response.setHeader("Expires", gmt.format(new Date))
response.setHeader("Cache-Control", "no-cache, must-revalidate")
response.setHeader("Connection","close")
Some(request.asInstanceOf[tAsyncRequest].startAsync)
}
//
// shoot the message to the root endpoint for processing
// IMPORTANT: the suspend method is invoked on the jetty thread not in the actor
//
val msg = builder(suspend _)
if (msg.context ne None) {_root ! msg}
}
/**
* Subclasses can choose to have the servlet listen to the async context events
* @return A type of either AsyncListener or ContinuationListener
*/
def hook:Option[AnyRef] = None
//
// HttpServlet API
//
final val Jetty7Server = "Jetty(7"
override def init(config: ServletConfig) =
{
super.init(config)
val context = config.getServletContext
val server = context.getServerInfo
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
log.debug("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor)
(major, minor) match {
case (3,0) => {
log.debug("Supporting Java asynchronous contexts.")
}
case (2,5) if (server startsWith Jetty7Server) => {
log.debug("Supporting Jetty asynchronous continuations.")
}
case _ => {
log.error("No asynchronous request handling can be supported.")
}
}
}
protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Delete(f, hook _))
protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Get(f, hook _))
protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Head(f, hook _))
protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Options(f, hook _))
protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Post(f, hook _))
protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Put(f, hook _))
protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Trace(f, hook _))
}

View file

@ -0,0 +1,134 @@
/**
* Copyright 2010 Autodesk, Inc. All rights reserved.
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
*/
package akka.http
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import akka.actor.{ActorRegistry, ActorRef, Actor}
/**
* @author Garrick Evans
*/
trait Endpoint
{
this: Actor =>
import Endpoint._
type Hook = Function[String, Boolean]
type Provider = Function[String, ActorRef]
/**
* A convenience method to get the actor ref
*/
def actor: ActorRef = this.self
/**
* The list of connected endpoints to which this one should/could forward the request.
* If the hook func returns true, the message will be sent to the actor returned from provider.
*/
protected var _attachments = List[Tuple2[Hook, Provider]]()
/**
*
*/
protected def _attach(hook:Hook, provider:Provider) =
{
_attachments = (hook, provider) :: _attachments
}
/**
* Message handling common to all endpoints, must be chained
*/
protected def _recv: Receive =
{
//
// add the endpoint - the if the uri hook matches,
// the message will be sent to the actor returned by the provider func
//
case Attach(hook, provider) => _attach(hook, provider)
//
// dispatch the suspended requests
//
case msg if msg.isInstanceOf[SuspendedRequest] =>
{
val req = msg.asInstanceOf[SuspendedRequest]
val uri = req.request.getRequestURI
val endpoints = _attachments.filter {_._1(uri)}
if (endpoints.size > 0)
endpoints.foreach {_._2(uri) ! req}
else
{
self.sender match
{
case Some(s) => s reply NoneAvailable(uri, req)
case None => _na(uri, req)
}
}
}
}
/**
* no endpoint available - completes the request with a 404
*/
protected def _na(uri: String, req: SuspendedRequest) =
{
req.NotFound("No endpoint available for [" + uri + "]")
log.debug("No endpoint available for [" + uri + "]")
}
}
class RootEndpoint extends Actor with Endpoint
{
import Endpoint._
final val Root = "/"
//
// use the configurable dispatcher
//
self.dispatcher = Endpoint.Dispatcher
override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
def recv: Receive =
{
case NoneAvailable(uri, req) => _na(uri, req)
case unknown =>
{
log.error("Unexpected message sent to root endpoint. [" + unknown.toString + "]")
}
}
/**
* Note that root is a little different, other endpoints should chain their own recv first
*/
def receive = {_recv orElse recv}
}
object Endpoint
{
import akka.dispatch.Dispatchers
/**
* leverage the akka config to tweak the dispatcher for our endpoints
*/
final val Dispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
type Hook = Function[String, Boolean]
type Provider = Function[String, ActorRef]
case class Attach(hook: Hook, provider: Provider)
case class NoneAvailable(uri: String, req: SuspendedRequest)
}

View file

@ -0,0 +1,252 @@
/**
* Copyright 2010 Autodesk, Inc. All rights reserved.
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
*/
package akka.http
import akka.util.Logging
import Types._
/**
* @author Garrick Evans
*/
trait SuspendedRequest extends Logging
{
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import org.eclipse.jetty.server._
import org.eclipse.jetty.continuation._
final val Timeout = "timeout"
final val DefaultTimeout = 30000
var context: Option[tAsyncContext] = None
def init(suspend:()=>Option[tAsyncContext], callback:()=>Option[AnyRef]) =
{
suspend() match
{
case (Some(continuation)) =>
{
context = Some(continuation)
val ac = continuation.asInstanceOf[AsyncContinuation]
(ac.isInitial, ac.isSuspended, ac.isExpired) match
{
//
// the fresh continuation
//
case (true, false, false) =>
{
ac.setTimeout(DefaultTimeout)
//callback() foreach {listener => ac.addContinuationListener(listener)}
//ac.addContinuationListener(this)
ac.suspend
}
//
// the timeout was reset and the continuation was resumed
// need to update the timeout and resuspend
// very important to clear the context so the request is not rebroadcast to the endpoint
//
case (false, false, false) =>
{
ac.setTimeout(ac.getAttribute(Timeout).asInstanceOf[Long])
ac.suspend
ac.removeAttribute(Timeout)
context = None
log.debug("Updating and re-suspending request. TIMEOUT ("+ac.getTimeout+" ms)")
}
//
// we don't actually expect to get this one here since the listener will finish him off
//
case (_, _, true) =>
{
response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT)
context = None
log.warning("Expired request arrived here unexpectedly. REQUEST ("+continuation.toString+")")
}
case unknown =>
{
log.error("Unexpected continuation state detected - cancelling")
ac.cancel
context = None
}
}
}
case _ =>
{
log.error("Cannot initialize request without an asynchronous context.")
}
}
}
def request = context.get.getRequest.asInstanceOf[HttpServletRequest]
def response = context.get.getResponse.asInstanceOf[HttpServletResponse]
def suspended =
{
context match
{
case Some(continuation) =>
{
val ac = continuation.asInstanceOf[AsyncContinuation]
(ac.isSuspended || (ac.getAttribute(Timeout) != null))
}
case None => false
}
}
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
{
request.getHeader(name) match
{
case null => default(null)
case s => s
}
}
def getParameterOrElse(name: String, default: Function[Any, String]): String =
{
request.getParameter(name) match
{
case null => default(null)
case s => s
}
}
/**
* Allow for an updatable timeout
*/
def timeout(ms:Long):Unit =
{
context match
{
case Some(continuation) =>
{
continuation.asInstanceOf[AsyncContinuation].setAttribute(Timeout, ms)
continuation.asInstanceOf[AsyncContinuation].resume
}
case None => log.error("Cannot update the timeout on an unsuspended request")
}
}
def complete(status: Int, body: String): Boolean = complete(status, body, List[Tuple2[String, String]]())
def complete(status: Int, body: String, headers: List[Tuple2[String, String]]): Boolean =
{
var ok = false
context match
{
case Some(pipe) =>
{
try
{
if (!suspended)
{
log.warning("Attempt to complete an expired connection.")
}
else
{
response.setStatus(status)
headers foreach {h => response.setHeader(h._1, h._2)}
response.getWriter.write(body)
response.getWriter.close
response.flushBuffer
pipe.complete
ok = true
}
}
catch
{
case ex => log.error(ex, "Failed to write data to connection on resume - the client probably disconnected")
}
finally
{
context = None
}
}
case None =>
{
log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")")
}
}
ok
}
def complete(t: Throwable): Unit =
{
var status = 0
context match
{
case Some(pipe) =>
{
try
{
if (!suspended)
{
log.warning("Attempt to complete an expired connection.")
}
else
{
status = HttpServletResponse.SC_INTERNAL_SERVER_ERROR
response.sendError(status, "Failed to write data to connection on resume")
pipe.complete
}
}
catch
{
case ex => log.error(ex, "Request completed with internal error.")
}
finally
{
context = None
log.error(t, "Request completed with internal error.")
}
}
case None =>
{
log.error(t, "Attempt to complete request with no context")
}
}
}
def onComplete(c:Continuation) = {}
def onTimeout(c:Continuation) =
{
c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader("Suspend","Timeout")
c.complete
log.debug("Request expired. CONTEXT ("+c+")")
}
def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body)
def OK(body: String, headers:List[Tuple2[String,String]]): Boolean = complete(HttpServletResponse.SC_OK, body, headers)
def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body)
def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body)
def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body)
def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body)
def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body)
def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body)
def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body)
def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body)
def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body)
def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body)
def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body)
def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body)
def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body)
def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString)))
}
case class Delete(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
case class Get(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
case class Head(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
case class Options(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
case class Post(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
case class Put(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
case class Trace(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}

View file

@ -0,0 +1,50 @@
/**
* Copyright 2010 Autodesk, Inc. All rights reserved.
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
*/
package akka.http
/**
* Structural type alias's required to work with both Servlet 3.0 and Jetty's Continuation API
*
* @author Garrick Evans
*/
object Types
{
import javax.servlet. {ServletContext, ServletRequest, ServletResponse}
type tAsyncRequest = {
def startAsync:tAsyncContext
}
type tAsyncContext = {
def complete:Unit
def dispatch:Unit
def dispatch(s:String):Unit
def dispatch(c:ServletContext, s:String)
def getRequest:ServletRequest
def getResponse:ServletResponse
def hasOriginalRequestAndResponse:Boolean
def setTimeout(ms:Long):Unit
def start(r:Runnable):Unit
}
type tContinuation = {
def complete:Unit
def isExpired:Boolean
def isInitial:Boolean
def isResumed:Boolean
def isSuspended:Boolean
def resume:Unit
def suspend:Unit
def undispatch:Unit
}
type tContinuationListener = {
def onComplete(c:tContinuation)
def onTimeout(c:tContinuation)
}
}

View file

@ -0,0 +1,45 @@
/**
* Copyright 2010 Autodesk, Inc. All rights reserved.
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
*/
package sample.mist
import akka.actor._
import akka.actor.Actor._
import akka.config.Supervision._
import akka.http._
/**
* Starts up the base services for http (jetty)
*/
class Boot {
val factory = SupervisorFactory(
SupervisorConfig(
OneForOneStrategy(List(classOf[Exception]), 3, 100),
Supervise(
actorOf[ServiceRoot],
Permanent) ::
Supervise(
actorOf[SimpleService],
Permanent)
:: Nil))
factory.newInstance.start
}
class ServiceRoot extends RootEndpoint
{
//
// use the configurable dispatcher
//
self.dispatcher = Endpoint.Dispatcher
//
// TODO: make this a config prop
//
self.id = "DefaultRootEndpoint"
}

View file

@ -0,0 +1,155 @@
/**
* Copyright 2010 Autodesk, Inc. All rights reserved.
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
*/
package sample.mist
import akka.actor._
import akka.actor.Actor._
import akka.http._
/**
* Define a top level service endpoint
* Usage: GET or POST to http://localhost:9998/simple/same or http://localhost:9998/simple/new
*
* @author Garrick Evans
*/
class SimpleService extends Actor with Endpoint
{
final val ServiceRoot = "/simple/"
final val ProvideSameActor = ServiceRoot + "same"
final val ProvideNewActor = ServiceRoot + "new"
//
// use the configurable dispatcher
//
self.dispatcher = Endpoint.Dispatcher
//
// there are different ways of doing this - in this case, we'll use a single hook function
// and discriminate in the provider; alternatively we can pair hooks & providers
//
def hook(uri: String): Boolean = ((uri == ProvideSameActor) || (uri == ProvideNewActor))
def provide(uri: String): ActorRef =
{
if (uri == ProvideSameActor)
same
else
actorOf[BoringActor].start
}
//
// this is where you want attach your endpoint hooks
//
override def preStart =
{
//
// we expect there to be one root and that it's already been started up
// obviously there are plenty of other ways to obtaining this actor
// the point is that we need to attach something (for starters anyway)
// to the root
//
val root = ActorRegistry.actorsFor(classOf[RootEndpoint]).head
root ! Endpoint.Attach(hook, provide)
}
//
// since this actor isn't doing anything else (i.e. not handling other messages)
// just assign the receive func like so...
// otherwise you could do something like:
// def myrecv = {...}
// def receive = myrecv orElse _recv
//
def receive = _recv
//
// this will be our "same" actor provided with ProvideSameActor endpoint is hit
//
lazy val same = actorOf[BoringActor].start
}
/**
* Define a service handler to respond to some HTTP requests
*/
class BoringActor extends Actor
{
import java.util.Date
import javax.ws.rs.core.MediaType
var gets = 0
var posts = 0
var lastget:Option[Date] = None
var lastpost:Option[Date] = None
def receive =
{
//
// handle a get request
//
case get:Get =>
{
//
// the content type of the response.
// similar to @Produces annotation
//
get.response.setContentType(MediaType.TEXT_HTML)
//
// "work"
//
gets += 1
lastget = Some(new Date)
//
// respond
//
val res = "<p>Gets: "+gets+" Posts: "+posts+"</p><p>Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"</p>"
get.OK(res)
}
//
// handle a post request
//
case post:Post =>
{
//
// the expected content type of the request
// similar to @Consumes
//
if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED)
{
//
// the content type of the response.
// similar to @Produces annotation
//
post.response.setContentType(MediaType.TEXT_HTML)
//
// "work"
//
posts += 1
lastpost = Some(new Date)
//
// respond
//
val res = "<p>Gets: "+gets+" Posts: "+posts+"</p><p>Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"</p>"
post.OK(res)
}
else
{
post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')")
}
}
case other if other.isInstanceOf[SuspendedRequest] =>
{
other.asInstanceOf[SuspendedRequest].NotAllowed("Invalid method for this endpoint")
}
}
}