diff --git a/akka-http/src/main/scala/AkkaHttpServlet.scala b/akka-http/src/main/scala/AkkaHttpServlet.scala new file mode 100644 index 0000000000..61f2070ab0 --- /dev/null +++ b/akka-http/src/main/scala/AkkaHttpServlet.scala @@ -0,0 +1,130 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package akka.http + +import akka.util.Logging +import javax.servlet.http.HttpServlet + +/** + * @author Garrick Evans + */ +class AkkaHttpServlet extends HttpServlet with Logging +{ + import java.util. {Date, TimeZone} + import java.text.SimpleDateFormat + import javax.servlet.ServletConfig + import javax.servlet.http.{HttpServletRequest, HttpServletResponse} + import akka.actor.ActorRegistry + import Types._ + import AkkaHttpServlet._ + + + /** + * The root endpoint actor + */ + protected val _root = ActorRegistry.actorsFor(RootActorID).head + + /** + * Server-specific method factory + */ + protected var _factory:Option[RequestMethodFactory] = None + + /** + * Handles all servlet requests + */ + protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder:(()=>tAsyncRequestContext)=>RequestMethod) = + { + def suspend:tAsyncRequestContext = + { + // + // set to right now, which is effectively "already expired" + // + val gmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z") + gmt.setTimeZone(TimeZone.getTimeZone("GMT")) + + response.setHeader("Expires", gmt.format(new Date)) + response.setHeader("Cache-Control", "no-cache, must-revalidate") + + // + // no keep-alive? + // + if (ConnectionClose) response.setHeader("Connection","close") + + // + // suspend the request + // TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs + // + request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext] + } + + // + // shoot the message to the root endpoint for processing + // IMPORTANT: the suspend method is invoked on the server thread not in the actor + // + val method = builder(suspend _) + if (method.go) {_root ! method} + } + + + // + // HttpServlet API + // + + override def init(config: ServletConfig) = + { + super.init(config) + + val context = config.getServletContext + val server = context.getServerInfo + val (major, minor) = (context.getMajorVersion, context.getMinorVersion) + + log.info("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor) + + (major, minor) match { + + case (3,0) => { + log.info("Supporting Java asynchronous contexts.") + } + + case _ if (server.toLowerCase startsWith JettyServer) => { + + log.info("Supporting Jetty asynchronous continuations.") + _factory = Some(JettyContinuationMethodFactory) + } + + case _ => { + log.error("No asynchronous request handling can be supported.") + } + } + } + + protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Delete) + protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Get) + protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Head) + protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Options) + protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Post) + protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Put) + protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response) (_factory.get.Trace) +} + +object AkkaHttpServlet +{ + import akka.config.Config._ + + final val JettyServer = "jetty" + final val TimeoutAttribute = "timeout" + + val ConnectionClose = config.getBool("akka.rest.connection-close", true) + val RootActorBuiltin = config.getBool("akka.rest.root-actor-builtin", true) + val RootActorID = config.getString("akka.rest.root-actor-id", "_httproot") + val DefaultTimeout = config.getLong("akka.rest.timeout", 1000) + val ExpiredHeaderName = config.getString("akka.rest.expired-header-name", "Async-Timeout") + val ExpiredHeaderValue = config.getString("akka.rest.expired-header-value", "expired") +} + + + diff --git a/akka-http/src/main/scala/AsyncHttpServlet.scala b/akka-http/src/main/scala/AsyncHttpServlet.scala deleted file mode 100644 index 7e9ccb4855..0000000000 --- a/akka-http/src/main/scala/AsyncHttpServlet.scala +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Copyright 2010 Autodesk, Inc. All rights reserved. - * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. - */ - -package akka.http - -import akka.util.Logging -import javax.servlet.http.{HttpServletResponse, HttpServlet} - -/** - * @author Garrick Evans - */ -class AsyncHttpServlet extends HttpServlet with Logging -{ - import java.util. {Date, TimeZone} - import java.text.SimpleDateFormat - import javax.servlet.ServletConfig - import javax.servlet.http.{HttpServletRequest, HttpServletResponse} - import akka.actor.ActorRegistry - import Types._ - - // - // the root endpoint for this servlet will have been booted already - // use the system property to find out the actor id and cache him - // TODO: currently this is hardcoded but really use a property - // - protected val _root = ActorRegistry.actorsFor("DefaultGridRoot").head - - /** - * Handles the HTTP request method on the servlet, suspends the connection and sends the asynchronous context - * along to the root endpoint in a SuspendedRequest message - */ - protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder: (()=>Option[tAsyncContext]) => SuspendedRequest) = - { - def suspend:Option[tAsyncContext] = - { - // - // set to effectively "already expired" - // - val gmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z") - gmt.setTimeZone(TimeZone.getTimeZone("GMT")) - - response.setHeader("Expires", gmt.format(new Date)) - response.setHeader("Cache-Control", "no-cache, must-revalidate") - response.setHeader("Connection","close") - - Some(request.asInstanceOf[tAsyncRequest].startAsync) - } - - // - // shoot the message to the root endpoint for processing - // IMPORTANT: the suspend method is invoked on the jetty thread not in the actor - // - val msg = builder(suspend _) - if (msg.context ne None) {_root ! msg} - } - - /** - * Subclasses can choose to have the servlet listen to the async context events - * @return A type of either AsyncListener or ContinuationListener - */ - def hook:Option[AnyRef] = None - - - // - // HttpServlet API - // - - final val Jetty7Server = "Jetty(7" - - override def init(config: ServletConfig) = - { - super.init(config) - - val context = config.getServletContext - val server = context.getServerInfo - val (major, minor) = (context.getMajorVersion, context.getMinorVersion) - - log.debug("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor) - - (major, minor) match { - - case (3,0) => { - log.debug("Supporting Java asynchronous contexts.") - } - - case (2,5) if (server startsWith Jetty7Server) => { - log.debug("Supporting Jetty asynchronous continuations.") - - } - - case _ => { - log.error("No asynchronous request handling can be supported.") - } - } - } - - - protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Delete(f, hook _)) - protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Get(f, hook _)) - protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Head(f, hook _)) - protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Options(f, hook _)) - protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Post(f, hook _)) - protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Put(f, hook _)) - protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Trace(f, hook _)) -} - diff --git a/akka-http/src/main/scala/ContinuationSupport.scala b/akka-http/src/main/scala/ContinuationSupport.scala deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/akka-http/src/main/scala/Endpoint.scala b/akka-http/src/main/scala/Endpoint.scala index dd00ee4bd9..42c6d02758 100644 --- a/akka-http/src/main/scala/Endpoint.scala +++ b/akka-http/src/main/scala/Endpoint.scala @@ -55,9 +55,9 @@ trait Endpoint // // dispatch the suspended requests // - case msg if msg.isInstanceOf[SuspendedRequest] => + case msg if msg.isInstanceOf[RequestMethod] => { - val req = msg.asInstanceOf[SuspendedRequest] + val req = msg.asInstanceOf[RequestMethod] val uri = req.request.getRequestURI val endpoints = _attachments.filter {_._1(uri)} @@ -78,7 +78,7 @@ trait Endpoint /** * no endpoint available - completes the request with a 404 */ - protected def _na(uri: String, req: SuspendedRequest) = + protected def _na(uri: String, req: RequestMethod) = { req.NotFound("No endpoint available for [" + uri + "]") log.debug("No endpoint available for [" + uri + "]") @@ -89,6 +89,7 @@ trait Endpoint class RootEndpoint extends Actor with Endpoint { import Endpoint._ + import AkkaHttpServlet._ final val Root = "/" @@ -97,6 +98,11 @@ class RootEndpoint extends Actor with Endpoint // self.dispatcher = Endpoint.Dispatcher + // + // adopt the configured id + // + if (RootActorBuiltin) self.id = RootActorID + override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments def recv: Receive = @@ -130,5 +136,5 @@ object Endpoint type Provider = Function[String, ActorRef] case class Attach(hook: Hook, provider: Provider) - case class NoneAvailable(uri: String, req: SuspendedRequest) + case class NoneAvailable(uri: String, req: RequestMethod) } diff --git a/akka-http/src/main/scala/JettyContinuation.scala b/akka-http/src/main/scala/JettyContinuation.scala new file mode 100644 index 0000000000..6c753382f2 --- /dev/null +++ b/akka-http/src/main/scala/JettyContinuation.scala @@ -0,0 +1,140 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package akka.http + +import org.eclipse.jetty.server._ +import org.eclipse.jetty.continuation._ +import Types._ + + +/** + * @author Garrick Evans + */ +trait JettyContinuation extends ContinuationListener with akka.util.Logging +{ + import javax.servlet.http.HttpServletResponse + import AkkaHttpServlet._ + + val builder:()=>tAsyncRequestContext + val context:Option[tAsyncRequestContext] = Some(builder()) + def go = {_continuation.isDefined} + + protected val _continuation:Option[AsyncContinuation] = { + + val continuation = context.get.asInstanceOf[AsyncContinuation] + + (continuation.isInitial, + continuation.isSuspended, + continuation.isExpired) match { + + // + // the fresh continuation (coming through getAsyncContinuation) + // + case (true, false, false) => { + + continuation.setTimeout(DefaultTimeout) + + //callback() foreach {listener => continuation.addContinuationListener(listener.asInstanceOf[ContinuationListener])} + continuation.addContinuationListener(this) + continuation.suspend + + Some(continuation) + } + // + // the fresh continuation (coming through startAsync instead) + // + case (true, true, false) => { + + continuation.setTimeout(DefaultTimeout) + + //callback() foreach {listener => continuation.addContinuationListener(listener.asInstanceOf[ContinuationListener])} + continuation.addContinuationListener(this) + + Some(continuation) + } + // + // the timeout was reset and the continuation was resumed + // need to update the timeout and resuspend + // very important to clear the context so the request is not rebroadcast to the endpoint + // + case (false, false, false) => { + + continuation.setTimeout(continuation.getAttribute(TimeoutAttribute).asInstanceOf[Long]) + continuation.suspend + continuation.removeAttribute(TimeoutAttribute) + + None + } + // + // we don't actually expect to get this one here since the listener will finish him off + // + case (_, _, true) => { + + None + } + // + // snuh? + // + case _ => { + + continuation.cancel + + None + } + } + } + + def suspended:Boolean = + { + _continuation match { + + case Some(continuation) => (continuation.isSuspended || (continuation.getAttribute(TimeoutAttribute) != null)) + case None => { + false + } + } + } + + def timeout(ms:Long):Boolean = + { + _continuation match { + + case Some(continuation) => { + + continuation.setAttribute(TimeoutAttribute, ms) + continuation.resume + true + } + + case None => false + } + } + + // + // ContinuationListener + // + + def onComplete(c:Continuation) = {} + def onTimeout(c:Continuation) = + { + c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue) + c.complete + } +} + +object JettyContinuationMethodFactory extends RequestMethodFactory +{ + def Delete(f:(()=>tAsyncRequestContext)):RequestMethod = {new Delete(f) with JettyContinuation} + def Get(f:(()=>tAsyncRequestContext)):RequestMethod = {new Get(f) with JettyContinuation} + def Head(f:(()=>tAsyncRequestContext)):RequestMethod = {new Head(f) with JettyContinuation} + def Options(f:(()=>tAsyncRequestContext)):RequestMethod = {new Options(f) with JettyContinuation} + def Post(f:(()=>tAsyncRequestContext)):RequestMethod = {new Post(f) with JettyContinuation} + def Put(f:(()=>tAsyncRequestContext)):RequestMethod = {new Put(f) with JettyContinuation} + def Trace(f:(()=>tAsyncRequestContext)):RequestMethod = {new Trace(f) with JettyContinuation} +} + + diff --git a/akka-http/src/main/scala/RequestMethod.scala b/akka-http/src/main/scala/RequestMethod.scala new file mode 100644 index 0000000000..ba86336521 --- /dev/null +++ b/akka-http/src/main/scala/RequestMethod.scala @@ -0,0 +1,186 @@ +/** + * Copyright 2010 Autodesk, Inc. All rights reserved. + * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. + * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. + */ + +package akka.http + + +import akka.util.Logging +import Types._ + + +/** + * Basic description of the suspended async http request. + * Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations) + * + * @author Garrick Evans + */ +trait RequestMethod extends Logging +{ + import java.io.IOException + import javax.servlet.http.{HttpServletResponse, HttpServletRequest} + + // + // required implementations + // + + val builder:()=>tAsyncRequestContext + + /** + * Provides a general type for the underlying context + * + * @return a completable request context + */ + val context:Option[tAsyncRequestContext] + def go:Boolean + + /** + * Updates (resets) the timeout + * + * @return true if updated, false if not supported + */ + def timeout(ms:Long):Boolean + + /** + * Status of the suspension + */ + def suspended:Boolean + + // + // convenience funcs + // + + def request = context.get.getRequest.asInstanceOf[HttpServletRequest] + def response = context.get.getResponse.asInstanceOf[HttpServletResponse] + + def getHeaderOrElse(name: String, default: Function[Any, String]): String = + { + request.getHeader(name) match { + + case null => default(null) + case s => s + } + } + + def getParameterOrElse(name: String, default: Function[Any, String]): String = + { + request.getParameter(name) match { + + case null => default(null) + case s => s + } + } + + + def complete(status: Int, body: String): Boolean = complete(status, body, List[Tuple2[String, String]]()) + + def complete(status: Int, body: String, headers: List[Tuple2[String, String]]): Boolean = + { + var ok = false + context match { + + case Some(pipe) => { + try + { + if (!suspended) + { + log.warning("Attempt to complete an expired connection.") + } + else + { + response.setStatus(status) + headers foreach {h => response.setHeader(h._1, h._2)} + response.getWriter.write(body) + response.getWriter.close + response.flushBuffer + pipe.complete + ok = true + } + } + catch + { + case io:IOException => log.error(io, "Failed to write data to connection on resume - the client probably disconnected") + } + } + + case None => { + log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")") + } + } + ok +} + + def complete(t: Throwable): Unit = + { + var status = 0 + context match { + + case Some(pipe) => { + try + { + if (!suspended) + { + log.warning("Attempt to complete an expired connection.") + } + else + { + status = HttpServletResponse.SC_INTERNAL_SERVER_ERROR + response.sendError(status, "Failed to write data to connection on resume") + pipe.complete + } + } + catch + { + case io:IOException => log.error(io, "Request completed with internal error.") + } + finally + { + log.error(t, "Request completed with internal error.") + } + } + + case None => { + log.error(t, "Attempt to complete request with no context") + } + } + } + + + def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body) + def OK(body: String, headers:List[Tuple2[String,String]]): Boolean = complete(HttpServletResponse.SC_OK, body, headers) + def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body) + def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body) + def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body) + def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body) + def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body) + def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body) + def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body) + def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body) + def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body) + def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body) + def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body) + def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body) + def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body) + def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString))) +} + +abstract class Delete(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f} +abstract class Get(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f} +abstract class Head(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f} +abstract class Options(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f} +abstract class Post(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f} +abstract class Put(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f} +abstract class Trace(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f} + +trait RequestMethodFactory +{ + def Delete(f:(()=>tAsyncRequestContext)):RequestMethod + def Get(f:(()=>tAsyncRequestContext)):RequestMethod + def Head(f:(()=>tAsyncRequestContext)):RequestMethod + def Options(f:(()=>tAsyncRequestContext)):RequestMethod + def Post(f:(()=>tAsyncRequestContext)):RequestMethod + def Put(f:(()=>tAsyncRequestContext)):RequestMethod + def Trace(f:(()=>tAsyncRequestContext)):RequestMethod +} diff --git a/akka-http/src/main/scala/SuspendedRequest.scala b/akka-http/src/main/scala/SuspendedRequest.scala deleted file mode 100644 index 2f3ecf7912..0000000000 --- a/akka-http/src/main/scala/SuspendedRequest.scala +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Copyright 2010 Autodesk, Inc. All rights reserved. - * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. - */ - -package akka.http - - -import akka.util.Logging -import Types._ - - -/** - * @author Garrick Evans - */ -trait SuspendedRequest extends Logging -{ - import javax.servlet.http.{HttpServletResponse, HttpServletRequest} - import org.eclipse.jetty.server._ - import org.eclipse.jetty.continuation._ - - final val Timeout = "timeout" - final val DefaultTimeout = 30000 - - var context: Option[tAsyncContext] = None - - def init(suspend:()=>Option[tAsyncContext], callback:()=>Option[AnyRef]) = - { - suspend() match - { - case (Some(continuation)) => - { - context = Some(continuation) - val ac = continuation.asInstanceOf[AsyncContinuation] - - (ac.isInitial, ac.isSuspended, ac.isExpired) match - { - // - // the fresh continuation - // - case (true, false, false) => - { - ac.setTimeout(DefaultTimeout) - - //callback() foreach {listener => ac.addContinuationListener(listener)} - //ac.addContinuationListener(this) - ac.suspend - } - // - // the timeout was reset and the continuation was resumed - // need to update the timeout and resuspend - // very important to clear the context so the request is not rebroadcast to the endpoint - // - case (false, false, false) => - { - ac.setTimeout(ac.getAttribute(Timeout).asInstanceOf[Long]) - ac.suspend - ac.removeAttribute(Timeout) - - context = None - log.debug("Updating and re-suspending request. TIMEOUT ("+ac.getTimeout+" ms)") - } - // - // we don't actually expect to get this one here since the listener will finish him off - // - case (_, _, true) => - { - response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT) - context = None - log.warning("Expired request arrived here unexpectedly. REQUEST ("+continuation.toString+")") - } - case unknown => - { - log.error("Unexpected continuation state detected - cancelling") - ac.cancel - context = None - } - } - } - case _ => - { - log.error("Cannot initialize request without an asynchronous context.") - } - } - } - - def request = context.get.getRequest.asInstanceOf[HttpServletRequest] - def response = context.get.getResponse.asInstanceOf[HttpServletResponse] - def suspended = - { - context match - { - case Some(continuation) => - { - val ac = continuation.asInstanceOf[AsyncContinuation] - (ac.isSuspended || (ac.getAttribute(Timeout) != null)) - } - case None => false - } - } - - def getHeaderOrElse(name: String, default: Function[Any, String]): String = - { - request.getHeader(name) match - { - case null => default(null) - case s => s - } - } - - def getParameterOrElse(name: String, default: Function[Any, String]): String = - { - request.getParameter(name) match - { - case null => default(null) - case s => s - } - } - - /** - * Allow for an updatable timeout - */ - def timeout(ms:Long):Unit = - { - context match - { - case Some(continuation) => - { - continuation.asInstanceOf[AsyncContinuation].setAttribute(Timeout, ms) - continuation.asInstanceOf[AsyncContinuation].resume - } - case None => log.error("Cannot update the timeout on an unsuspended request") - } - } - - def complete(status: Int, body: String): Boolean = complete(status, body, List[Tuple2[String, String]]()) - - def complete(status: Int, body: String, headers: List[Tuple2[String, String]]): Boolean = - { - var ok = false - context match - { - case Some(pipe) => - { - try - { - if (!suspended) - { - log.warning("Attempt to complete an expired connection.") - } - else - { - response.setStatus(status) - headers foreach {h => response.setHeader(h._1, h._2)} - response.getWriter.write(body) - response.getWriter.close - response.flushBuffer - pipe.complete - ok = true - } - } - catch - { - case ex => log.error(ex, "Failed to write data to connection on resume - the client probably disconnected") - } - finally - { - context = None - } - } - case None => - { - log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")") - } - } - ok - } - - def complete(t: Throwable): Unit = - { - var status = 0 - context match - { - case Some(pipe) => - { - try - { - if (!suspended) - { - log.warning("Attempt to complete an expired connection.") - } - else - { - status = HttpServletResponse.SC_INTERNAL_SERVER_ERROR - response.sendError(status, "Failed to write data to connection on resume") - pipe.complete - } - } - catch - { - case ex => log.error(ex, "Request completed with internal error.") - } - finally - { - context = None - log.error(t, "Request completed with internal error.") - } - } - case None => - { - log.error(t, "Attempt to complete request with no context") - } - } - } - - def onComplete(c:Continuation) = {} - def onTimeout(c:Continuation) = - { - c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader("Suspend","Timeout") - c.complete - log.debug("Request expired. CONTEXT ("+c+")") - } - - - def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body) - def OK(body: String, headers:List[Tuple2[String,String]]): Boolean = complete(HttpServletResponse.SC_OK, body, headers) - def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body) - def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body) - def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body) - def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body) - def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body) - def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body) - def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body) - def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body) - def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body) - def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body) - def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body) - def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body) - def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body) - def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString))) -} - - -case class Delete(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} -case class Get(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} -case class Head(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} -case class Options(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} -case class Post(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} -case class Put(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} -case class Trace(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)} - diff --git a/akka-http/src/main/scala/Types.scala b/akka-http/src/main/scala/Types.scala index ba423194a3..d77920b82e 100644 --- a/akka-http/src/main/scala/Types.scala +++ b/akka-http/src/main/scala/Types.scala @@ -14,37 +14,18 @@ package akka.http */ object Types { - import javax.servlet. {ServletContext, ServletRequest, ServletResponse} + import javax.servlet. {ServletRequest, ServletResponse} type tAsyncRequest = { - def startAsync:tAsyncContext + def startAsync:tAsyncRequestContext } - type tAsyncContext = { + /** + * Used to match both AsyncContext and AsyncContinuation in order to complete the request + */ + type tAsyncRequestContext = { def complete:Unit - def dispatch:Unit - def dispatch(s:String):Unit - def dispatch(c:ServletContext, s:String) def getRequest:ServletRequest def getResponse:ServletResponse - def hasOriginalRequestAndResponse:Boolean - def setTimeout(ms:Long):Unit - def start(r:Runnable):Unit - } - - type tContinuation = { - def complete:Unit - def isExpired:Boolean - def isInitial:Boolean - def isResumed:Boolean - def isSuspended:Boolean - def resume:Unit - def suspend:Unit - def undispatch:Unit - } - - type tContinuationListener = { - def onComplete(c:tContinuation) - def onTimeout(c:tContinuation) } } \ No newline at end of file diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/Boot.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/Boot.scala index 38087b03db..1eeb9348a3 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/Boot.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/Boot.scala @@ -4,7 +4,7 @@ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. */ -package sample.mist +package sample.rest.scala import akka.actor._ import akka.actor.Actor._ @@ -19,27 +19,16 @@ class Boot { val factory = SupervisorFactory( SupervisorConfig( OneForOneStrategy(List(classOf[Exception]), 3, 100), + // + // in this particular case, just boot the built-in default root endpoint + // Supervise( - actorOf[ServiceRoot], + actorOf[RootEndpoint], Permanent) :: Supervise( - actorOf[SimpleService], + actorOf[SimpleAkkaAsyncHttpService], Permanent) :: Nil)) factory.newInstance.start } - -class ServiceRoot extends RootEndpoint -{ - // - // use the configurable dispatcher - // - self.dispatcher = Endpoint.Dispatcher - - // - // TODO: make this a config prop - // - self.id = "DefaultRootEndpoint" -} - diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService2.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleAkkaAsyncHttpService.scala similarity index 94% rename from akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService2.scala rename to akka-samples/akka-sample-rest-scala/src/main/scala/SimpleAkkaAsyncHttpService.scala index 333cdb3c86..4fcb57de27 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService2.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleAkkaAsyncHttpService.scala @@ -4,7 +4,7 @@ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. */ -package sample.mist +package sample.rest.scala import akka.actor._ import akka.actor.Actor._ @@ -18,7 +18,7 @@ import akka.http._ * * @author Garrick Evans */ -class SimpleService extends Actor with Endpoint +class SimpleAkkaAsyncHttpService extends Actor with Endpoint { final val ServiceRoot = "/simple/" final val ProvideSameActor = ServiceRoot + "same" @@ -147,9 +147,9 @@ class BoringActor extends Actor } } - case other if other.isInstanceOf[SuspendedRequest] => + case other if other.isInstanceOf[RequestMethod] => { - other.asInstanceOf[SuspendedRequest].NotAllowed("Invalid method for this endpoint") + other.asInstanceOf[RequestMethod].NotAllowed("Invalid method for this endpoint") } } } diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index ccfd6fd286..896c405cb2 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -22,7 +22,7 @@ import org.atmosphere.util.XSSHtmlFilter import org.atmosphere.cpr.{Broadcaster, BroadcastFilter} import org.atmosphere.jersey.Broadcastable -class Boot { +class BootPrev { val factory = SupervisorFactory( SupervisorConfig( OneForOneStrategy(List(classOf[Exception]), 3, 100), diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 16aecf6bc9..f379cccb64 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -99,6 +99,13 @@ akka { } # maxInactiveActivity = 60000 # Atmosphere CometSupport maxInactiveActivity + connection-close = true # toggles the addition of the "Connection" response header with a "close" value + root-actor-id = "_httproot" # the id of the actor to use as the root endpoint + root-actor-builtin = true # toggles the use of the built-in root endpoint base class + timeout = 1000 # the default timeout for all async requests (in ms) + expired-header-name = "Async-Timeout" # the name of the response header to use when an async request expires + expired-header-value = "expired" # the value of the response header to use when an async request expires + # Uncomment if you are using the KerberosAuthenticationActor # kerberos { # servicePrincipal = "HTTP/localhost@EXAMPLE.COM" diff --git a/config/microkernel-server.xml b/config/microkernel-server.xml index 987dd22943..6a8ea4a67d 100644 --- a/config/microkernel-server.xml +++ b/config/microkernel-server.xml @@ -73,7 +73,7 @@ / - akka.comet.AkkaServlet + akka.http.AkkaHttpServlet /*