diff --git a/akka-http/src/main/scala/AkkaHttpServlet.scala b/akka-http/src/main/scala/AkkaHttpServlet.scala deleted file mode 100644 index 830ac16288..0000000000 --- a/akka-http/src/main/scala/AkkaHttpServlet.scala +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Copyright 2010 Autodesk, Inc. All rights reserved. - * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. - */ - -package akka.http - -import akka.util.Logging -import javax.servlet.http.HttpServlet - -/** - * @author Garrick Evans - */ -class AkkaHttpServlet extends HttpServlet with Logging -{ - import java.util. {Date, TimeZone} - import java.text.SimpleDateFormat - import javax.servlet.ServletConfig - import javax.servlet.http.{HttpServletRequest, HttpServletResponse} - import akka.actor.ActorRegistry - import Types._ - import AkkaHttpServlet._ - - - /** - * The root endpoint actor - */ - protected val _root = ActorRegistry.actorsFor(RootActorID).head - - /** - * Server-specific method factory - */ - protected var _factory:Option[RequestMethodFactory] = None - - /** - * Handles all servlet requests - */ - protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder:(() => tAsyncRequestContext) => RequestMethod) = - { - def suspend:tAsyncRequestContext = { - // - // set to right now, which is effectively "already expired" - // - - response.setDateHeader("Expires", System.currentTimeMillis) - response.setHeader("Cache-Control", "no-cache, must-revalidate") - - // - // no keep-alive? - // - if (ConnectionClose) response.setHeader("Connection","close") - - // - // suspend the request - // TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs - // - request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext] - } - - // - // shoot the message to the root endpoint for processing - // IMPORTANT: the suspend method is invoked on the server thread not in the actor - // - val method = builder(suspend _) - if (method.go) _root ! method - } - - - // - // HttpServlet API - // - - override def init(config: ServletConfig) = - { - super.init(config) - - val context = config.getServletContext - val server = context.getServerInfo - val (major, minor) = (context.getMajorVersion, context.getMinorVersion) - - log.info("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor) - - (major, minor) match { - - case (3,0) => { - log.info("Supporting Java asynchronous contexts.") - _factory = Some(Servlet30ContextMethodFactory) - } - - case _ if (server.toLowerCase startsWith JettyServer) => { - - log.info("Supporting Jetty asynchronous continuations.") - _factory = Some(JettyContinuationMethodFactory) - } - - case _ => { - log.error("No asynchronous request handling can be supported.") - } - } - } - - protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Delete) - protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Get) - protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Head) - protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Options) - protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Post) - protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Put) - protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Trace) -} - -object AkkaHttpServlet -{ - import akka.config.Config._ - - final val JettyServer = "jetty" - final val TimeoutAttribute = "timeout" - - val ConnectionClose = config.getBool("akka.rest.connection-close", true) - val RootActorBuiltin = config.getBool("akka.rest.root-actor-builtin", true) - val RootActorID = config.getString("akka.rest.root-actor-id", "_httproot") - val DefaultTimeout = config.getLong("akka.rest.timeout", 1000) - val ExpiredHeaderName = config.getString("akka.rest.expired-header-name", "Async-Timeout") - val ExpiredHeaderValue = config.getString("akka.rest.expired-header-value", "expired") -} - - - diff --git a/akka-http/src/main/scala/Endpoint.scala b/akka-http/src/main/scala/Endpoint.scala deleted file mode 100644 index e44b1453f9..0000000000 --- a/akka-http/src/main/scala/Endpoint.scala +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Copyright 2010 Autodesk, Inc. All rights reserved. - * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. - */ - -package akka.http - -import javax.servlet.http.{HttpServletResponse, HttpServletRequest} -import akka.actor.{ActorRegistry, ActorRef, Actor} - -/** - * @author Garrick Evans - */ -trait Endpoint -{ - this: Actor => - - import Endpoint._ - - /** - * A convenience method to get the actor ref - */ - def actor: ActorRef = this.self - - /** - * The list of connected endpoints to which this one should/could forward the request. - * If the hook func returns true, the message will be sent to the actor returned from provider. - */ - protected var _attachments = List[Tuple2[Hook, Provider]]() - - /** - * - */ - protected def _attach(hook:Hook, provider:Provider) = _attachments = (hook, provider) :: _attachments - - /** - * Message handling common to all endpoints, must be chained - */ - protected def _recv: Receive = { - // - // add the endpoint - the if the uri hook matches, - // the message will be sent to the actor returned by the provider func - // - case Attach(hook, provider) => _attach(hook, provider) - - // - // dispatch the suspended requests - // - case msg if msg.isInstanceOf[RequestMethod] => { - val req = msg.asInstanceOf[RequestMethod] - val uri = req.request.getRequestURI - val endpoints = _attachments.filter { _._1(uri) } - - if (!endpoints.isEmpty) - endpoints.foreach { _._2(uri) ! req } - else { - self.sender match { - case Some(s) => s reply NoneAvailable(uri, req) - case None => _na(uri, req) - } - } - } - } - - /** - * no endpoint available - completes the request with a 404 - */ - protected def _na(uri: String, req: RequestMethod) = { - req.NotFound("No endpoint available for [" + uri + "]") - log.debug("No endpoint available for [" + uri + "]") - } -} - - -class RootEndpoint extends Actor with Endpoint -{ - import Endpoint._ - import AkkaHttpServlet._ - - final val Root = "/" - - // - // use the configurable dispatcher - // - self.dispatcher = Endpoint.Dispatcher - - // - // adopt the configured id - // - if (RootActorBuiltin) self.id = RootActorID - - override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments - - def recv: Receive = { - case NoneAvailable(uri, req) => _na(uri, req) - case unknown => - log.error("Unexpected message sent to root endpoint. [" + unknown.toString + "]") - } - - /** - * Note that root is a little different, other endpoints should chain their own recv first - */ - def receive = _recv orElse recv -} - - - -object Endpoint -{ - import akka.dispatch.Dispatchers - - - /** - * leverage the akka config to tweak the dispatcher for our endpoints - */ - final val Dispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher") - - type Hook = Function[String, Boolean] - type Provider = Function[String, ActorRef] - - case class Attach(hook: Hook, provider: Provider) - case class NoneAvailable(uri: String, req: RequestMethod) -} diff --git a/akka-http/src/main/scala/JettyContinuation.scala b/akka-http/src/main/scala/JettyContinuation.scala index d2b5bc9aaf..3d53548352 100644 --- a/akka-http/src/main/scala/JettyContinuation.scala +++ b/akka-http/src/main/scala/JettyContinuation.scala @@ -17,7 +17,7 @@ import Types._ trait JettyContinuation extends ContinuationListener with akka.util.Logging { import javax.servlet.http.HttpServletResponse - import AkkaHttpServlet._ + import MistSettings._ val builder:() => tAsyncRequestContext val context: Option[tAsyncRequestContext] = Some(builder()) @@ -103,8 +103,7 @@ trait JettyContinuation extends ContinuationListener with akka.util.Logging } } -object JettyContinuationMethodFactory extends RequestMethodFactory -{ +object JettyContinuationMethodFactory extends RequestMethodFactory { def Delete(f: () => tAsyncRequestContext): RequestMethod = new Delete(f) with JettyContinuation def Get(f: () => tAsyncRequestContext): RequestMethod = new Get(f) with JettyContinuation def Head(f: () => tAsyncRequestContext): RequestMethod = new Head(f) with JettyContinuation diff --git a/akka-http/src/main/scala/Mist.scala b/akka-http/src/main/scala/Mist.scala new file mode 100644 index 0000000000..44c66c8f05 --- /dev/null +++ b/akka-http/src/main/scala/Mist.scala @@ -0,0 +1,466 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.http + +import akka.util.Logging +import javax.servlet.http.HttpServlet +import javax.servlet.http.{HttpServletResponse, HttpServletRequest} +import akka.actor.{ActorRegistry, ActorRef, Actor} +import javax.servlet.Filter + +/** + * @author Garrick Evans + */ +object MistSettings { + import akka.config.Config._ + + final val JettyServer = "jetty" + final val TimeoutAttribute = "timeout" + + val ConnectionClose = config.getBool("akka.http.connection-close", true) + val RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true) + val RootActorID = config.getString("akka.http.root-actor-id", "_httproot") + val DefaultTimeout = config.getLong("akka.http.timeout", 1000) + val ExpiredHeaderName = config.getString("akka.http.expired-header-name", "Async-Timeout") + val ExpiredHeaderValue = config.getString("akka.http.expired-header-value", "expired") +} + +/** + * Structural type alias's required to work with both Servlet 3.0 and Jetty's Continuation API + * + * @author Garrick Evans + */ +object Types { + import javax.servlet. {ServletRequest, ServletResponse} + + /** + * Represents an asynchronous request + */ + type tAsyncRequest = { + def startAsync: tAsyncRequestContext + } + + /** + * Used to match both AsyncContext and AsyncContinuation in order to complete the request + */ + type tAsyncRequestContext = { + def complete: Unit + def getRequest: ServletRequest + def getResponse: ServletResponse + } + + type Header = Tuple2[String,String] + type Headers = List[Header] + + def Headers(): Headers = Nil +} + +import Types._ + +/** + * + */ +trait Mist extends Logging { + import javax.servlet.{ServletContext} + import MistSettings._ + + /** + * The root endpoint actor + */ + protected val _root = ActorRegistry.actorsFor(RootActorID).head + + /** + * Server-specific method factory + */ + protected var _factory: Option[RequestMethodFactory] = None + + /** + * Handles all servlet requests + */ + protected def mistify(request: HttpServletRequest, + response: HttpServletResponse) + (builder: (() => tAsyncRequestContext) => RequestMethod) = { + def suspend: tAsyncRequestContext = { + // + // set to right now, which is effectively "already expired" + // + response.setDateHeader("Expires", System.currentTimeMillis) + response.setHeader("Cache-Control", "no-cache, must-revalidate") + + // + // no keep-alive? + // + if (ConnectionClose) response.setHeader("Connection","close") + + // + // suspend the request + // TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs + // + request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext] + } + + // + // shoot the message to the root endpoint for processing + // IMPORTANT: the suspend method is invoked on the server thread not in the actor + // + val method = builder(suspend _) + if (method.go) _root ! method + } + + /** + * Sets up what mist needs to be able to service requests + * must be called prior to dispatching to "mistify" + */ + def initMist(context: ServletContext) { + val server = context.getServerInfo + val (major, minor) = (context.getMajorVersion, context.getMinorVersion) + + log.info("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor) + + _factory = if (major >= 3) { + log.info("Supporting Java asynchronous contexts.") + Some(Servlet30ContextMethodFactory) + } else if (server.toLowerCase startsWith JettyServer) { + log.info("Supporting Jetty asynchronous continuations.") + Some(JettyContinuationMethodFactory) + } else { + log.error("No asynchronous request handling can be supported.") + None + } + } +} + +/** + * AkkaMistServlet adds support to bridge Http and Actors in an asynchronous fashion + * Async impls currently supported: Servlet3.0, Jetty Continuations + */ +class AkkaMistServlet extends HttpServlet with Mist { + import javax.servlet.{ServletConfig} + + /** + * Initializes Mist + */ + override def init(config: ServletConfig) { + super.init(config) + initMist(config.getServletContext) + } + + protected override def doDelete(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Delete) + protected override def doGet(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Get) + protected override def doHead(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Head) + protected override def doOptions(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Options) + protected override def doPost(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Post) + protected override def doPut(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Put) + protected override def doTrace(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res)(_factory.get.Trace) +} + +/** + * Proof-of-concept, use at own risk + * Will be officially supported in a later release + */ +class AkkaMistFilter extends Filter with Mist { + import javax.servlet.{ServletRequest, ServletResponse, FilterConfig, FilterChain} + + /** + * Initializes Mist + */ + def init(config: FilterConfig) { + initMist(config.getServletContext) + } + + /** + * Decide how/if to handle the request + */ + override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain) { + (req,res) match { + case (hreq: HttpServletRequest, hres: HttpServletResponse) => + hreq.getMethod.toUpperCase match { + case "DELETE" => mistify(hreq, hres)(_factory.get.Delete) + case "GET" => mistify(hreq, hres)(_factory.get.Get) + case "HEAD" => mistify(hreq, hres)(_factory.get.Head) + case "OPTIONS" => mistify(hreq, hres)(_factory.get.Options) + case "POST" => mistify(hreq, hres)(_factory.get.Post) + case "PUT" => mistify(hreq, hres)(_factory.get.Put) + case "TRACE" => mistify(hreq, hres)(_factory.get.Trace) + case unknown => log.warn("Unknown http method: %s",unknown) + } + chain.doFilter(req,res) + case _ => chain.doFilter(req,res) + } + } + + override def destroy {} +} + +/////////////////////////////////////////// +// Endpoints +/////////////////////////////////////////// + +object Endpoint { + import akka.dispatch.Dispatchers + + /** + * leverage the akka config to tweak the dispatcher for our endpoints + */ + final val Dispatcher = Dispatchers.fromConfig("akka.http.comet-dispatcher") + + type Hook = Function[String, Boolean] + type Provider = Function[String, ActorRef] + + case class Attach(hook: Hook, provider: Provider) + case class NoneAvailable(uri: String, req: RequestMethod) +} + +/** + * @author Garrick Evans + */ +trait Endpoint { this: Actor => + + import Endpoint._ + + /** + * A convenience method to get the actor ref + */ + def actor: ActorRef = this.self + + /** + * The list of connected endpoints to which this one should/could forward the request. + * If the hook func returns true, the message will be sent to the actor returned from provider. + */ + protected var _attachments = List[Tuple2[Hook, Provider]]() + + /** + * + */ + protected def _attach(hook:Hook, provider:Provider) = _attachments = (hook, provider) :: _attachments + + /** + * Message handling common to all endpoints, must be chained + */ + protected def handleHttpRequest: Receive = { + // + // add the endpoint - the if the uri hook matches, + // the message will be sent to the actor returned by the provider func + // + case Attach(hook, provider) => _attach(hook, provider) + + // + // dispatch the suspended requests + // + case req: RequestMethod => { + val uri = req.request.getRequestURI + val endpoints = _attachments.filter { _._1(uri) } + + if (!endpoints.isEmpty) + endpoints.foreach { _._2(uri) ! req } + else { + self.sender match { + case Some(s) => s reply NoneAvailable(uri, req) + case None => _na(uri, req) + } + } + } + } + + /** + * no endpoint available - completes the request with a 404 + */ + protected def _na(uri: String, req: RequestMethod) = { + req.NotFound("No endpoint available for [" + uri + "]") + log.debug("No endpoint available for [" + uri + "]") + } +} + + +class RootEndpoint extends Actor with Endpoint { + import Endpoint._ + import MistSettings._ + + final val Root = "/" + + // + // use the configurable dispatcher + // + self.dispatcher = Endpoint.Dispatcher + + // + // adopt the configured id + // + if (RootActorBuiltin) self.id = RootActorID + + override def preStart = + _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments + + //TODO: Is this needed? + //override def postRestart = + // _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments + + def recv: Receive = { + case NoneAvailable(uri, req) => _na(uri, req) + case unknown => log.error("Unexpected message sent to root endpoint. [" + unknown + "]") + } + + /** + * Note that root is a little different, other endpoints should chain their own recv first + */ + def receive = handleHttpRequest orElse recv +} + +/////////////////////////////////////////// +// RequestMethods +/////////////////////////////////////////// + +/** + * Basic description of the suspended async http request. + * Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations) + * + * @author Garrick Evans + */ +trait RequestMethod extends Logging +{ + import java.io.IOException + import javax.servlet.http.{HttpServletResponse, HttpServletRequest} + + // + // required implementations + // + + val builder: () => tAsyncRequestContext + + /** + * Provides a general type for the underlying context + * + * @return a completable request context + */ + val context: Option[tAsyncRequestContext] + def go:Boolean + + /** + * Updates (resets) the timeout + * + * @return true if updated, false if not supported + */ + def timeout(ms: Long): Boolean + + /** + * Status of the suspension + */ + def suspended: Boolean + + // + // convenience funcs + // + + def request = context.get.getRequest.asInstanceOf[HttpServletRequest] + def response = context.get.getResponse.asInstanceOf[HttpServletResponse] + + def getHeaderOrElse(name: String, default: Function[Any, String]): String = + request.getHeader(name) match { + case null => default(null) + case s => s + } + + def getParameterOrElse(name: String, default: Function[Any, String]): String = + request.getParameter(name) match { + case null => default(null) + case s => s + } + + + def complete(status: Int, body: String): Boolean = complete(status, body, Headers()) + + def complete(status: Int, body: String, headers: Headers): Boolean = + context match { + case Some(pipe) => { + try { + if (!suspended) { + log.warning("Attempt to complete an expired connection.") + false + } + else { + response.setStatus(status) + headers foreach {h => response.setHeader(h._1, h._2)} + response.getWriter.write(body) + response.getWriter.close + response.flushBuffer + pipe.complete + true + } + } catch { + case io => + log.error(io, "Failed to write data to connection on resume - the client probably disconnected") + false + } + } + + case None => + log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")") + false + } + + def complete(t: Throwable) { + context match { + case Some(pipe) => { + try { + if (!suspended) { + log.warning("Attempt to complete an expired connection.") + } + else { + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume") + pipe.complete + } + } + catch { + case io: IOException => log.error(io, "Request completed with internal error.") + } + finally { + log.error(t, "Request completed with internal error.") + } + } + + case None => + log.error(t, "Attempt to complete request with no context") + } + } + + /** + * Utility methods to send responses back + */ + + def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body) + def OK(body: String, headers:Headers): Boolean = complete(HttpServletResponse.SC_OK, body, headers) + def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body) + def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body) + def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body) + def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body) + def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body) + def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body) + def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body) + def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body) + def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body) + def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body) + def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body) + def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body) + def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body) + def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString))) +} + +abstract class Delete(val builder: () => tAsyncRequestContext) extends RequestMethod +abstract class Get(val builder: () => tAsyncRequestContext) extends RequestMethod +abstract class Head(val builder: () => tAsyncRequestContext) extends RequestMethod +abstract class Options(val builder: () => tAsyncRequestContext) extends RequestMethod +abstract class Post(val builder: () => tAsyncRequestContext) extends RequestMethod +abstract class Put(val builder: () => tAsyncRequestContext) extends RequestMethod +abstract class Trace(val builder: () => tAsyncRequestContext) extends RequestMethod + +trait RequestMethodFactory { + def Delete(f: () => tAsyncRequestContext): RequestMethod + def Get(f: () => tAsyncRequestContext): RequestMethod + def Head(f: () => tAsyncRequestContext): RequestMethod + def Options(f: () => tAsyncRequestContext): RequestMethod + def Post(f: () => tAsyncRequestContext): RequestMethod + def Put(f: () => tAsyncRequestContext): RequestMethod + def Trace(f: () => tAsyncRequestContext): RequestMethod +} diff --git a/akka-http/src/main/scala/RequestMethod.scala b/akka-http/src/main/scala/RequestMethod.scala deleted file mode 100644 index 658a21874c..0000000000 --- a/akka-http/src/main/scala/RequestMethod.scala +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Copyright 2010 Autodesk, Inc. All rights reserved. - * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. - */ - -package akka.http - - -import akka.util.Logging -import Types._ - - -/** - * Basic description of the suspended async http request. - * Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations) - * - * @author Garrick Evans - */ -trait RequestMethod extends Logging -{ - import java.io.IOException - import javax.servlet.http.{HttpServletResponse, HttpServletRequest} - - // - // required implementations - // - - val builder:() => tAsyncRequestContext - - /** - * Provides a general type for the underlying context - * - * @return a completable request context - */ - val context: Option[tAsyncRequestContext] - def go:Boolean - - /** - * Updates (resets) the timeout - * - * @return true if updated, false if not supported - */ - def timeout(ms: Long): Boolean - - /** - * Status of the suspension - */ - def suspended: Boolean - - // - // convenience funcs - // - - def request = context.get.getRequest.asInstanceOf[HttpServletRequest] - def response = context.get.getResponse.asInstanceOf[HttpServletResponse] - - def getHeaderOrElse(name: String, default: Function[Any, String]): String = - request.getHeader(name) match { - case null => default(null) - case s => s - } - - def getParameterOrElse(name: String, default: Function[Any, String]): String = - request.getParameter(name) match { - case null => default(null) - case s => s - } - - - def complete(status: Int, body: String): Boolean = complete(status, body, List[Tuple2[String, String]]()) - - def complete(status: Int, body: String, headers: List[Tuple2[String, String]]): Boolean = - context match { - case Some(pipe) => { - try { - if (!suspended) { - log.warning("Attempt to complete an expired connection.") - false - } - else { - response.setStatus(status) - headers foreach {h => response.setHeader(h._1, h._2)} - response.getWriter.write(body) - response.getWriter.close - response.flushBuffer - pipe.complete - true - } - } catch { - case io => - log.error(io, "Failed to write data to connection on resume - the client probably disconnected") - false - } - } - - case None => - log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")") - false - } - - def complete(t: Throwable) { - context match { - case Some(pipe) => { - try { - if (!suspended) { - log.warning("Attempt to complete an expired connection.") - } - else { - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume") - pipe.complete - } - } - catch { - case io:IOException => log.error(io, "Request completed with internal error.") - } - finally { - log.error(t, "Request completed with internal error.") - } - } - - case None => - log.error(t, "Attempt to complete request with no context") - } - } - - - def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body) - def OK(body: String, headers:List[Tuple2[String,String]]): Boolean = complete(HttpServletResponse.SC_OK, body, headers) - def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body) - def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body) - def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body) - def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body) - def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body) - def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body) - def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body) - def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body) - def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body) - def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body) - def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body) - def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body) - def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body) - def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString))) -} - -abstract class Delete(val builder: () => tAsyncRequestContext) extends RequestMethod -abstract class Get(val builder: () => tAsyncRequestContext) extends RequestMethod -abstract class Head(val builder: () => tAsyncRequestContext) extends RequestMethod -abstract class Options(val builder: () => tAsyncRequestContext) extends RequestMethod -abstract class Post(val builder: () => tAsyncRequestContext) extends RequestMethod -abstract class Put(val builder: () => tAsyncRequestContext) extends RequestMethod -abstract class Trace(val builder: () => tAsyncRequestContext) extends RequestMethod - -trait RequestMethodFactory -{ - def Delete(f: () => tAsyncRequestContext): RequestMethod - def Get(f: () => tAsyncRequestContext): RequestMethod - def Head(f: () => tAsyncRequestContext): RequestMethod - def Options(f: () => tAsyncRequestContext): RequestMethod - def Post(f: () => tAsyncRequestContext): RequestMethod - def Put(f: () => tAsyncRequestContext): RequestMethod - def Trace(f: () => tAsyncRequestContext): RequestMethod -} diff --git a/akka-http/src/main/scala/Servlet30Context.scala b/akka-http/src/main/scala/Servlet30Context.scala index cf31eccbc4..f890ef3e21 100644 --- a/akka-http/src/main/scala/Servlet30Context.scala +++ b/akka-http/src/main/scala/Servlet30Context.scala @@ -1,7 +1,5 @@ /** - * Copyright 2010 Autodesk, Inc. All rights reserved. - * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + * Copyright (C) 2009-2010 Scalable Solutions AB */ package akka.http @@ -16,7 +14,7 @@ import Types._ trait Servlet30Context extends AsyncListener with akka.util.Logging { import javax.servlet.http.HttpServletResponse - import AkkaHttpServlet._ + import MistSettings._ val builder: () => tAsyncRequestContext val context: Option[tAsyncRequestContext] = Some(builder()) @@ -31,13 +29,13 @@ trait Servlet30Context extends AsyncListener with akka.util.Logging def suspended = true - def timeout(ms:Long):Boolean = { + def timeout(ms: Long): Boolean = { try { _ac setTimeout ms true } catch { - case ex:IllegalStateException => + case ex: IllegalStateException => log.info("Cannot update timeout - already returned to container") false } @@ -46,13 +44,13 @@ trait Servlet30Context extends AsyncListener with akka.util.Logging // // AsyncListener // - def onComplete(e:AsyncEvent) = {} - def onError(e:AsyncEvent) = e.getThrowable match { + def onComplete(e: AsyncEvent) {} + def onError(e: AsyncEvent) = e.getThrowable match { case null => log.warning("Error occured...") case t => log.warning(t, "Error occured") } - def onStartAsync(e:AsyncEvent) = {} - def onTimeout(e:AsyncEvent) = { + def onStartAsync(e: AsyncEvent) {} + def onTimeout(e: AsyncEvent) = { e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue) e.getAsyncContext.complete } diff --git a/akka-http/src/main/scala/Types.scala b/akka-http/src/main/scala/Types.scala deleted file mode 100644 index c736babca8..0000000000 --- a/akka-http/src/main/scala/Types.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright 2010 Autodesk, Inc. All rights reserved. - * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. - */ - -package akka.http - - -/** - * Structural type alias's required to work with both Servlet 3.0 and Jetty's Continuation API - * - * @author Garrick Evans - */ -object Types -{ - import javax.servlet. {ServletRequest, ServletResponse} - - type tAsyncRequest = { - def startAsync: tAsyncRequestContext - } - - /** - * Used to match both AsyncContext and AsyncContinuation in order to complete the request - */ - type tAsyncRequestContext = { - def complete: Unit - def getRequest: ServletRequest - def getResponse: ServletResponse - } -} \ No newline at end of file diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/InterestingService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/InterestingService.scala index fff28b8adb..e00a7e8120 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/InterestingService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/InterestingService.scala @@ -18,52 +18,50 @@ import javax.servlet.http.HttpServletResponse * * @author Garrick Evans */ -class InterestingService extends Actor with Endpoint -{ +class InterestingService extends Actor with Endpoint { + // + // use the configurable dispatcher + // + self.dispatcher = Endpoint.Dispatcher + final val ServiceRoot = "/interesting/" final val Multi = ServiceRoot + "multi/" - // - // use the configurable dispatcher - // - self.dispatcher = Endpoint.Dispatcher - - // - // The "multi" endpoint shows forking off multiple actions per request - // It is triggered by POSTing to http://localhost:9998/interesting/multi/{foo} - // Try with/without a header named "Test-Token" - // Try with/without a form parameter named "Data" - // - def hookMultiActionA(uri: String): Boolean = {(uri startsWith Multi)} + // + // The "multi" endpoint shows forking off multiple actions per request + // It is triggered by POSTing to http://localhost:9998/interesting/multi/{foo} + // Try with/without a header named "Test-Token" + // Try with/without a form parameter named "Data" + // + def hookMultiActionA(uri: String): Boolean = uri startsWith Multi def provideMultiActionA(uri: String): ActorRef = actorOf(new ActionAActor(complete)).start - def hookMultiActionB(uri: String): Boolean = {(uri startsWith Multi)} + def hookMultiActionB(uri: String): Boolean = uri startsWith Multi def provideMultiActionB(uri: String): ActorRef = actorOf(new ActionBActor(complete)).start + // + // this is where you want attach your endpoint hooks + // + override def preStart { // - // this is where you want attach your endpoint hooks + // we expect there to be one root and that it's already been started up + // obviously there are plenty of other ways to obtaining this actor + // the point is that we need to attach something (for starters anyway) + // to the root // - override def preStart = - { - // - // we expect there to be one root and that it's already been started up - // obviously there are plenty of other ways to obtaining this actor - // the point is that we need to attach something (for starters anyway) - // to the root - // - val root = ActorRegistry.actorsFor(classOf[RootEndpoint]).head - root ! Endpoint.Attach(hookMultiActionA, provideMultiActionA) - root ! Endpoint.Attach(hookMultiActionB, provideMultiActionB) - } + val root = ActorRegistry.actorsFor(classOf[RootEndpoint]).head + root ! Endpoint.Attach(hookMultiActionA, provideMultiActionA) + root ! Endpoint.Attach(hookMultiActionB, provideMultiActionB) + } // // since this actor isn't doing anything else (i.e. not handling other messages) // just assign the receive func like so... // otherwise you could do something like: // def myrecv = {...} - // def receive = myrecv orElse _recv + // def receive = myrecv orElse handleHttpRequest // - def receive = _recv + def receive = handleHttpRequest // @@ -72,86 +70,72 @@ class InterestingService extends Actor with Endpoint lazy val complete = actorOf[ActionCompleteActor].start } -class ActionAActor(complete:ActorRef) extends Actor -{ +class ActionAActor(complete: ActorRef) extends Actor { import javax.ws.rs.core.MediaType - def receive = - { + def receive = { + // + // handle a post request + // + case post:Post => { // - // handle a post request + // the expected content type of the request + // similar to @Consumes // - case post:Post => - { + if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) { // - // the expected content type of the request - // similar to @Consumes + // the content type of the response. + // similar to @Produces annotation // - if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) - { - // - // the content type of the response. - // similar to @Produces annotation - // post.response.setContentType(MediaType.TEXT_HTML) // // get the resource name // val name = post.request.getRequestURI.substring("/interesting/multi/".length) - if (name.length % 2 == 0) - { - post.response.getWriter.write("

Action A verified request.

") - } + val response = if (name.length % 2 == 0) + "

Action A verified request.

" else - { - post.response.getWriter.write("

Action A could not verify request.

") - } + "

Action A could not verify request.

" + + post.response.getWriter.write(response) // // notify the next actor to coordinate the response // complete ! post } - else - { + else { post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')") } } } } -class ActionBActor(complete:ActorRef) extends Actor -{ +class ActionBActor(complete:ActorRef) extends Actor { import javax.ws.rs.core.MediaType - def receive = - { + def receive = { + // + // handle a post request + // + case post:Post => { // - // handle a post request + // the expected content type of the request + // similar to @Consumes // - case post:Post => - { + if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) { // - // the expected content type of the request - // similar to @Consumes + // pull some headers and form params // - if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) - { - // - // pull some headers and form params - // - def default(any: Any): String = {""} + def default(any: Any): String = "" val token = post.getHeaderOrElse("Test-Token", default) val data = post.getParameterOrElse("Data", default) - val (resp, status) = { - (token, data) match - { - case ("", _) => ("No token provided", HttpServletResponse.SC_FORBIDDEN) - case (_, "") => ("No data", HttpServletResponse.SC_ACCEPTED) - case _ => ("Data accepted", HttpServletResponse.SC_OK) - } + val (resp, status) = (token, data) match { + case ("", _) => ("No token provided", HttpServletResponse.SC_FORBIDDEN) + case (_, "") => ("No data", HttpServletResponse.SC_ACCEPTED) + case _ => ("Data accepted", HttpServletResponse.SC_OK) } // @@ -164,16 +148,13 @@ class ActionBActor(complete:ActorRef) extends Actor // complete ! (post, status) } - else - { + else { post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')") } } - case other if other.isInstanceOf[RequestMethod] => - { - other.asInstanceOf[RequestMethod].NotAllowed("Invalid method for this endpoint") - } + case other: RequestMethod => + other.NotAllowed("Invalid method for this endpoint") } } @@ -183,39 +164,24 @@ class ActionCompleteActor extends Actor val requests = HashMap.empty[Int, Int] - def receive = - { - case req:RequestMethod => - { + def receive = { + case req: RequestMethod => if (requests contains req.hashCode) - { complete(req) - } else requests += (req.hashCode -> 0) - } - case t:Tuple2[RequestMethod, Int] => - { + case t: Tuple2[RequestMethod, Int] => if (requests contains t._1.hashCode) - { complete(t._1) - } else requests += (t._1.hashCode -> t._2) - } } - def complete(req:RequestMethod) = - { - requests.get(req.hashCode) match - { - case Some(HttpServletResponse.SC_FORBIDDEN) => req.Forbidden("") - case Some(HttpServletResponse.SC_ACCEPTED) => req.Accepted("") - case Some(_) => req.OK("") - case _ => {} - } - - requests remove req.hashCode + def complete(req:RequestMethod) = requests.remove(req.hashCode) match { + case Some(HttpServletResponse.SC_FORBIDDEN) => req.Forbidden("") + case Some(HttpServletResponse.SC_ACCEPTED) => req.Accepted("") + case Some(_) => req.OK("") + case _ => } } \ No newline at end of file diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleAkkaAsyncHttpService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleAkkaAsyncHttpService.scala index 4fcb57de27..57aa3754b8 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleAkkaAsyncHttpService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleAkkaAsyncHttpService.scala @@ -18,54 +18,52 @@ import akka.http._ * * @author Garrick Evans */ -class SimpleAkkaAsyncHttpService extends Actor with Endpoint -{ - final val ServiceRoot = "/simple/" - final val ProvideSameActor = ServiceRoot + "same" - final val ProvideNewActor = ServiceRoot + "new" - - // +class SimpleAkkaAsyncHttpService extends Actor with Endpoint { + // // use the configurable dispatcher // self.dispatcher = Endpoint.Dispatcher - // - // there are different ways of doing this - in this case, we'll use a single hook function - // and discriminate in the provider; alternatively we can pair hooks & providers - // - def hook(uri: String): Boolean = ((uri == ProvideSameActor) || (uri == ProvideNewActor)) - def provide(uri: String): ActorRef = - { - if (uri == ProvideSameActor) - same - else - actorOf[BoringActor].start + final val ServiceRoot = "/simple/" + final val ProvideSameActor = ServiceRoot + "same" + final val ProvideNewActor = ServiceRoot + "new" + + // + // there are different ways of doing this - in this case, we'll use a single hook function + // and discriminate in the provider; alternatively we can pair hooks & providers + // + def hook(uri: String): Boolean = uri match { + case ProvideSameActor | ProvideNewActor => true + case _ => false } + def provide(uri: String): ActorRef = uri match { + case ProvideSameActor => same + case _ => actorOf[BoringActor].start + } + + // + // this is where you want attach your endpoint hooks + // + override def preStart { // - // this is where you want attach your endpoint hooks + // we expect there to be one root and that it's already been started up + // obviously there are plenty of other ways to obtaining this actor + // the point is that we need to attach something (for starters anyway) + // to the root // - override def preStart = - { - // - // we expect there to be one root and that it's already been started up - // obviously there are plenty of other ways to obtaining this actor - // the point is that we need to attach something (for starters anyway) - // to the root - // - val root = ActorRegistry.actorsFor(classOf[RootEndpoint]).head - root ! Endpoint.Attach(hook, provide) - } + val root = ActorRegistry.actorsFor(classOf[RootEndpoint]).head + root ! Endpoint.Attach(hook, provide) + } // // since this actor isn't doing anything else (i.e. not handling other messages) // just assign the receive func like so... // otherwise you could do something like: // def myrecv = {...} - // def receive = myrecv orElse _recv + // def receive = myrecv orElse handleHttpRequest // - def receive = _recv - + def receive = handleHttpRequest // // this will be our "same" actor provided with ProvideSameActor endpoint is hit @@ -76,57 +74,52 @@ class SimpleAkkaAsyncHttpService extends Actor with Endpoint /** * Define a service handler to respond to some HTTP requests */ -class BoringActor extends Actor -{ +class BoringActor extends Actor { import java.util.Date import javax.ws.rs.core.MediaType var gets = 0 var posts = 0 - var lastget:Option[Date] = None - var lastpost:Option[Date] = None + var lastget: Option[Date] = None + var lastpost: Option[Date] = None - def receive = - { - // - // handle a get request - // - case get:Get => - { - // - // the content type of the response. - // similar to @Produces annotation - // - get.response.setContentType(MediaType.TEXT_HTML) + def receive = { + // + // handle a get request + // + case get: Get => { + // + // the content type of the response. + // similar to @Produces annotation + // + get.response.setContentType(MediaType.TEXT_HTML) - // - // "work" - // - gets += 1 - lastget = Some(new Date) + // + // "work" + // + gets += 1 + lastget = Some(new Date) - // - // respond - // - val res = "

Gets: "+gets+" Posts: "+posts+"

Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"

" - get.OK(res) - } + // + // respond + // + val res = "

Gets: "+gets+" Posts: "+posts+"

Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"

" + get.OK(res) + } // // handle a post request // - case post:Post => - { + case post:Post => { + // + // the expected content type of the request + // similar to @Consumes + // + if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) { // - // the expected content type of the request - // similar to @Consumes + // the content type of the response. + // similar to @Produces annotation // - if (post.request.getContentType startsWith MediaType.APPLICATION_FORM_URLENCODED) - { - // - // the content type of the response. - // similar to @Produces annotation - // post.response.setContentType(MediaType.TEXT_HTML) // @@ -141,15 +134,11 @@ class BoringActor extends Actor val res = "

Gets: "+gets+" Posts: "+posts+"

Last Get: "+lastget.getOrElse("Never").toString+" Last Post: "+lastpost.getOrElse("Never").toString+"

" post.OK(res) } - else - { + else { post.UnsupportedMediaType("Content-Type request header missing or incorrect (was '" + post.request.getContentType + "' should be '" + MediaType.APPLICATION_FORM_URLENCODED + "')") } } - case other if other.isInstanceOf[RequestMethod] => - { - other.asInstanceOf[RequestMethod].NotAllowed("Invalid method for this endpoint") - } + case other: RequestMethod => other.NotAllowed("Invalid method for this endpoint") } } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 42a4f1f06b..bf84369b38 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -8,7 +8,7 @@ akka { version = "1.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka. - enabled-modules = [] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] + enabled-modules = ["http"] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] time-unit = "seconds" # Time unit for all timeout properties throughout the config diff --git a/config/microkernel-server.xml b/config/microkernel-server.xml index 6a8ea4a67d..9c8901505f 100644 --- a/config/microkernel-server.xml +++ b/config/microkernel-server.xml @@ -73,7 +73,7 @@ / - akka.http.AkkaHttpServlet + akka.http.AkkaMistServlet /* diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 94225f90fb..8c6a6bb671 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -181,7 +181,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 - lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "compile" //CDDL v1 + lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1 lazy val jersey = "com.sun.jersey" % "jersey-core" % JERSEY_VERSION % "compile" //CDDL v1 lazy val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile" //CDDL v1