From 9136e62cd2b6cc6dae682de75cf0e962dcd1f1f4 Mon Sep 17 00:00:00 2001 From: Garrick Evans Date: Mon, 8 Nov 2010 10:54:43 -0800 Subject: [PATCH] adding back (mist) http work in a new branch. misitfy was too stale. this is WIP - trying to support both SAPI 3.0 and Jetty Continuations at once --- .../src/main/scala/AsyncHttpServlet.scala | 109 ++++++++ akka-http/src/main/scala/Endpoint.scala | 134 ++++++++++ .../src/main/scala/SuspendedRequest.scala | 252 ++++++++++++++++++ akka-http/src/main/scala/Types.scala | 50 ++++ .../src/main/scala/Boot.scala | 45 ++++ .../src/main/scala/SimpleService2.scala | 155 +++++++++++ 6 files changed, 745 insertions(+) create mode 100644 akka-http/src/main/scala/AsyncHttpServlet.scala create mode 100644 akka-http/src/main/scala/Endpoint.scala create mode 100644 akka-http/src/main/scala/SuspendedRequest.scala create mode 100644 akka-http/src/main/scala/Types.scala create mode 100644 akka-samples/akka-sample-rest-scala/src/main/scala/Boot.scala create mode 100644 akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService2.scala diff --git a/akka-http/src/main/scala/AsyncHttpServlet.scala b/akka-http/src/main/scala/AsyncHttpServlet.scala new file mode 100644 index 0000000000..7e9ccb4855 --- /dev/null +++ b/akka-http/src/main/scala/AsyncHttpServlet.scala @@ -0,0 +1,109 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package akka.http + +import akka.util.Logging +import javax.servlet.http.{HttpServletResponse, HttpServlet} + +/** + * @author Garrick Evans + */ +class AsyncHttpServlet extends HttpServlet with Logging +{ + import java.util. {Date, TimeZone} + import java.text.SimpleDateFormat + import javax.servlet.ServletConfig + import javax.servlet.http.{HttpServletRequest, HttpServletResponse} + import akka.actor.ActorRegistry + import Types._ + + // + // the root endpoint for this servlet will have been booted already + // use the system property to find out the actor id and cache him + // TODO: currently this is hardcoded but really use a property + // + protected val _root = ActorRegistry.actorsFor("DefaultGridRoot").head + + /** + * Handles the HTTP request method on the servlet, suspends the connection and sends the asynchronous context + * along to the root endpoint in a SuspendedRequest message + */ + protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder: (()=>Option[tAsyncContext]) => SuspendedRequest) = + { + def suspend:Option[tAsyncContext] = + { + // + // set to effectively "already expired" + // + val gmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z") + gmt.setTimeZone(TimeZone.getTimeZone("GMT")) + + response.setHeader("Expires", gmt.format(new Date)) + response.setHeader("Cache-Control", "no-cache, must-revalidate") + response.setHeader("Connection","close") + + Some(request.asInstanceOf[tAsyncRequest].startAsync) + } + + // + // shoot the message to the root endpoint for processing + // IMPORTANT: the suspend method is invoked on the jetty thread not in the actor + // + val msg = builder(suspend _) + if (msg.context ne None) {_root ! msg} + } + + /** + * Subclasses can choose to have the servlet listen to the async context events + * @return A type of either AsyncListener or ContinuationListener + */ + def hook:Option[AnyRef] = None + + + // + // HttpServlet API + // + + final val Jetty7Server = "Jetty(7" + + override def init(config: ServletConfig) = + { + super.init(config) + + val context = config.getServletContext + val server = context.getServerInfo + val (major, minor) = (context.getMajorVersion, context.getMinorVersion) + + log.debug("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor) + + (major, minor) match { + + case (3,0) => { + log.debug("Supporting Java asynchronous contexts.") + } + + case (2,5) if (server startsWith Jetty7Server) => { + log.debug("Supporting Jetty asynchronous continuations.") + + } + + case _ => { + log.error("No asynchronous request handling can be supported.") + } + } + } + + + protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Delete(f, hook _)) + protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Get(f, hook _)) + protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Head(f, hook _)) + protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Options(f, hook _)) + protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Post(f, hook _)) + protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Put(f, hook _)) + protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Trace(f, hook _)) +} + diff --git a/akka-http/src/main/scala/Endpoint.scala b/akka-http/src/main/scala/Endpoint.scala new file mode 100644 index 0000000000..dd00ee4bd9 --- /dev/null +++ b/akka-http/src/main/scala/Endpoint.scala @@ -0,0 +1,134 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package akka.http + +import javax.servlet.http.{HttpServletResponse, HttpServletRequest} +import akka.actor.{ActorRegistry, ActorRef, Actor} + +/** + * @author Garrick Evans + */ +trait Endpoint +{ + this: Actor => + + import Endpoint._ + + type Hook = Function[String, Boolean] + type Provider = Function[String, ActorRef] + + /** + * A convenience method to get the actor ref + */ + def actor: ActorRef = this.self + + /** + * The list of connected endpoints to which this one should/could forward the request. + * If the hook func returns true, the message will be sent to the actor returned from provider. + */ + protected var _attachments = List[Tuple2[Hook, Provider]]() + + /** + * + */ + protected def _attach(hook:Hook, provider:Provider) = + { + _attachments = (hook, provider) :: _attachments + } + + /** + * Message handling common to all endpoints, must be chained + */ + protected def _recv: Receive = + { + // + // add the endpoint - the if the uri hook matches, + // the message will be sent to the actor returned by the provider func + // + case Attach(hook, provider) => _attach(hook, provider) + + + // + // dispatch the suspended requests + // + case msg if msg.isInstanceOf[SuspendedRequest] => + { + val req = msg.asInstanceOf[SuspendedRequest] + val uri = req.request.getRequestURI + val endpoints = _attachments.filter {_._1(uri)} + + if (endpoints.size > 0) + endpoints.foreach {_._2(uri) ! req} + else + { + self.sender match + { + case Some(s) => s reply NoneAvailable(uri, req) + case None => _na(uri, req) + } + } + } + + } + + /** + * no endpoint available - completes the request with a 404 + */ + protected def _na(uri: String, req: SuspendedRequest) = + { + req.NotFound("No endpoint available for [" + uri + "]") + log.debug("No endpoint available for [" + uri + "]") + } +} + + +class RootEndpoint extends Actor with Endpoint +{ + import Endpoint._ + + final val Root = "/" + + // + // use the configurable dispatcher + // + self.dispatcher = Endpoint.Dispatcher + + override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments + + def recv: Receive = + { + case NoneAvailable(uri, req) => _na(uri, req) + case unknown => + { + log.error("Unexpected message sent to root endpoint. [" + unknown.toString + "]") + } + } + + /** + * Note that root is a little different, other endpoints should chain their own recv first + */ + def receive = {_recv orElse recv} +} + + + +object Endpoint +{ + import akka.dispatch.Dispatchers + + + /** + * leverage the akka config to tweak the dispatcher for our endpoints + */ + final val Dispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher") + + type Hook = Function[String, Boolean] + type Provider = Function[String, ActorRef] + + case class Attach(hook: Hook, provider: Provider) + case class NoneAvailable(uri: String, req: SuspendedRequest) +} diff --git a/akka-http/src/main/scala/SuspendedRequest.scala b/akka-http/src/main/scala/SuspendedRequest.scala new file mode 100644 index 0000000000..2f3ecf7912 --- /dev/null +++ b/akka-http/src/main/scala/SuspendedRequest.scala @@ -0,0 +1,252 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package akka.http + + +import akka.util.Logging +import Types._ + + +/** + * @author Garrick Evans + */ +trait SuspendedRequest extends Logging +{ + import javax.servlet.http.{HttpServletResponse, HttpServletRequest} + import org.eclipse.jetty.server._ + import org.eclipse.jetty.continuation._ + + final val Timeout = "timeout" + final val DefaultTimeout = 30000 + + var context: Option[tAsyncContext] = None + + def init(suspend:()=>Option[tAsyncContext], callback:()=>Option[AnyRef]) = + { + suspend() match + { + case (Some(continuation)) => + { + context = Some(continuation) + val ac = continuation.asInstanceOf[AsyncContinuation] + + (ac.isInitial, ac.isSuspended, ac.isExpired) match + { + // + // the fresh continuation + // + case (true, false, false) => + { + ac.setTimeout(DefaultTimeout) + + //callback() foreach {listener => ac.addContinuationListener(listener)} + //ac.addContinuationListener(this) + ac.suspend + } + // + // the timeout was reset and the continuation was resumed + // need to update the timeout and resuspend + // very important to clear the context so the request is not rebroadcast to the endpoint + // + case (false, false, false) => + { + ac.setTimeout(ac.getAttribute(Timeout).asInstanceOf[Long]) + ac.suspend + ac.removeAttribute(Timeout) + + context = None + log.debug("Updating and re-suspending request. TIMEOUT ("+ac.getTimeout+" ms)") + } + // + // we don't actually expect to get this one here since the listener will finish him off + // + case (_, _, true) => + { + response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT) + context = None + log.warning("Expired request arrived here unexpectedly. REQUEST ("+continuation.toString+")") + } + case unknown => + { + log.error("Unexpected continuation state detected - cancelling") + ac.cancel + context = None + } + } + } + case _ => + { + log.error("Cannot initialize request without an asynchronous context.") + } + } + } + + def request = context.get.getRequest.asInstanceOf[HttpServletRequest] + def response = context.get.getResponse.asInstanceOf[HttpServletResponse] + def suspended = + { + context match + { + case Some(continuation) => + { + val ac = continuation.asInstanceOf[AsyncContinuation] + (ac.isSuspended || (ac.getAttribute(Timeout) != null)) + } + case None => false + } + } + + def getHeaderOrElse(name: String, default: Function[Any, String]): String = + { + request.getHeader(name) match + { + case null => default(null) + case s => s + } + } + + def getParameterOrElse(name: String, default: Function[Any, String]): String = + { + request.getParameter(name) match + { + case null => default(null) + case s => s + } + } + + /** + * Allow for an updatable timeout + */ + def timeout(ms:Long):Unit = + { + context match + { + case Some(continuation) => + { + continuation.asInstanceOf[AsyncContinuation].setAttribute(Timeout, ms) + continuation.asInstanceOf[AsyncContinuation].resume + } + case None => log.error("Cannot update the timeout on an unsuspended request") + } + } + + def complete(status: Int, body: String): Boolean = complete(status, body, List[Tuple2[String, String]]()) + + def complete(status: Int, body: String, headers: List[Tuple2[String, String]]): Boolean = + { + var ok = false + context match + { + case Some(pipe) => + { + try + { + if (!suspended) + { + log.warning("Attempt to complete an expired connection.") + } + else + { + response.setStatus(status) + headers foreach {h => response.setHeader(h._1, h._2)} + response.getWriter.write(body) + response.getWriter.close + response.flushBuffer + pipe.complete + ok = true + } + } + catch + { + case ex => log.error(ex, "Failed to write data to connection on resume - the client probably disconnected") + } + finally + { + context = None + } + } + case None => + { + log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")") + } + } + ok + } + + def complete(t: Throwable): Unit = + { + var status = 0 + context match + { + case Some(pipe) => + { + try + { + if (!suspended) + { + log.warning("Attempt to complete an expired connection.") + } + else + { + status = HttpServletResponse.SC_INTERNAL_SERVER_ERROR + response.sendError(status, "Failed to write data to connection on resume") + pipe.complete + } + } + catch + { + case ex => log.error(ex, "Request completed with internal error.") + } + finally + { + context = None + log.error(t, "Request completed with internal error.") + } + } + case None => + { + log.error(t, "Attempt to complete request with no context") + } + } + } + + def onComplete(c:Continuation) = {} + def onTimeout(c:Continuation) = + { + c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader("Suspend","Timeout") + c.complete + log.debug("Request expired. CONTEXT ("+c+")") + } + + + def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body) + def OK(body: String, headers:List[Tuple2[String,String]]): Boolean = complete(HttpServletResponse.SC_OK, body, headers) + def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body) + def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body) + def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body) + def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body) + def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body) + def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body) + def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body) + def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body) + def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body) + def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body) + def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body) + def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body) + def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body) + def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString))) +} + + +case class Delete(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} +case class Get(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} +case class Head(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} +case class Options(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} +case class Post(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} +case class Put(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} +case class Trace(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} + diff --git a/akka-http/src/main/scala/Types.scala b/akka-http/src/main/scala/Types.scala new file mode 100644 index 0000000000..ba423194a3 --- /dev/null +++ b/akka-http/src/main/scala/Types.scala @@ -0,0 +1,50 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package akka.http + + +/** + * Structural type alias's required to work with both Servlet 3.0 and Jetty's Continuation API + * + * @author Garrick Evans + */ +object Types +{ + import javax.servlet. {ServletContext, ServletRequest, ServletResponse} + + type tAsyncRequest = { + def startAsync:tAsyncContext + } + + type tAsyncContext = { + def complete:Unit + def dispatch:Unit + def dispatch(s:String):Unit + def dispatch(c:ServletContext, s:String) + def getRequest:ServletRequest + def getResponse:ServletResponse + def hasOriginalRequestAndResponse:Boolean + def setTimeout(ms:Long):Unit + def start(r:Runnable):Unit + } + + type tContinuation = { + def complete:Unit + def isExpired:Boolean + def isInitial:Boolean + def isResumed:Boolean + def isSuspended:Boolean + def resume:Unit + def suspend:Unit + def undispatch:Unit + } + + type tContinuationListener = { + def onComplete(c:tContinuation) + def onTimeout(c:tContinuation) + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/Boot.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/Boot.scala new file mode 100644 index 0000000000..38087b03db --- /dev/null +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/Boot.scala @@ -0,0 +1,45 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package sample.mist + +import akka.actor._ +import akka.actor.Actor._ +import akka.config.Supervision._ +import akka.http._ + + +/** + * Starts up the base services for http (jetty) + */ +class Boot { + val factory = SupervisorFactory( + SupervisorConfig( + OneForOneStrategy(List(classOf[Exception]), 3, 100), + Supervise( + actorOf[ServiceRoot], + Permanent) :: + Supervise( + actorOf[SimpleService], + Permanent) + :: Nil)) + factory.newInstance.start +} + + +class ServiceRoot extends RootEndpoint +{ + // + // use the configurable dispatcher + // + self.dispatcher = Endpoint.Dispatcher + + // + // TODO: make this a config prop + // + self.id = "DefaultRootEndpoint" +} + diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService2.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService2.scala new file mode 100644 index 0000000000..333cdb3c86 --- /dev/null +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService2.scala @@ -0,0 +1,155 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package sample.mist + +import akka.actor._ +import akka.actor.Actor._ +import akka.http._ + + + +/** + * Define a top level service endpoint + * Usage: GET or POST to http://localhost:9998/simple/same or http://localhost:9998/simple/new + * + * @author Garrick Evans + */ +class SimpleService extends Actor with Endpoint +{ + final val ServiceRoot = "/simple/" + final val ProvideSameActor = ServiceRoot + "same" + final val ProvideNewActor = ServiceRoot + "new" + + // + // use the configurable dispatcher + // + self.dispatcher = Endpoint.Dispatcher + + // + // there are different ways of doing this - in this case, we'll use a single hook function + // and discriminate in the provider; alternatively we can pair hooks & providers + // + def hook(uri: String): Boolean = ((uri == ProvideSameActor) || (uri == ProvideNewActor)) + def provide(uri: String): ActorRef = + { + if (uri == ProvideSameActor) + same + else + actorOf[BoringActor].start + } + + // + // this is where you want attach your endpoint hooks + // + override def preStart = + { + // + // we expect there to be one root and that it's already been started up + // obviously there are plenty of other ways to obtaining this actor + // the point is that we need to attach something (for starters anyway) + // to the root + // + val root = ActorRegistry.actorsFor(classOf[RootEndpoint]).head + root ! Endpoint.Attach(hook, provide) + } + + // + // since this actor isn't doing anything else (i.e. not handling other messages) + // just assign the receive func like so... + // otherwise you could do something like: + // def myrecv = {...} + // def receive = myrecv orElse _recv + // + def receive = _recv + + + // + // this will be our "same" actor provided with ProvideSameActor endpoint is hit + // + lazy val same = actorOf[BoringActor].start +} + +/** + * Define a service handler to respond to some HTTP requests + */ +class BoringActor extends Actor +{ + import java.util.Date + import javax.ws.rs.core.MediaType + + var gets = 0 + var posts = 0 + var lastget:Option[Date] = None + var lastpost:Option[Date] = None + + def receive = + { + // + // handle a get request + // + case get:Get => + { + // + // the content type of the response. + // similar to @Produces annotation + // + get.response.setContentType(MediaType.TEXT_HTML) + + // + // "work" + // + gets += 1 + lastget = Some(new Date) + + // + // respond + // + val res = "

Gets: "+gets+" Posts: "+posts+"

Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"

" + get.OK(res) + } + + // + // handle a post request + // + case post:Post => + { + // + // the expected content type of the request + // similar to @Consumes + // + if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) + { + // + // the content type of the response. + // similar to @Produces annotation + // + post.response.setContentType(MediaType.TEXT_HTML) + + // + // "work" + // + posts += 1 + lastpost = Some(new Date) + + // + // respond + // + val res = "

Gets: "+gets+" Posts: "+posts+"

Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"

" + post.OK(res) + } + else + { + post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')") + } + } + + case other if other.isInstanceOf[SuspendedRequest] => + { + other.asInstanceOf[SuspendedRequest].NotAllowed("Invalid method for this endpoint") + } + } +}