most of the refactoring done and jetty is working again (need to check updating timeouts, etc); servlet 3.0 impl next
This commit is contained in:
parent
5ff7b8517f
commit
92df463e92
13 changed files with 491 additions and 413 deletions
130
akka-http/src/main/scala/AkkaHttpServlet.scala
Normal file
130
akka-http/src/main/scala/AkkaHttpServlet.scala
Normal file
|
|
@ -0,0 +1,130 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
import akka.util.Logging
|
||||
import javax.servlet.http.HttpServlet
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
class AkkaHttpServlet extends HttpServlet with Logging
|
||||
{
|
||||
import java.util. {Date, TimeZone}
|
||||
import java.text.SimpleDateFormat
|
||||
import javax.servlet.ServletConfig
|
||||
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
|
||||
import akka.actor.ActorRegistry
|
||||
import Types._
|
||||
import AkkaHttpServlet._
|
||||
|
||||
|
||||
/**
|
||||
* The root endpoint actor
|
||||
*/
|
||||
protected val _root = ActorRegistry.actorsFor(RootActorID).head
|
||||
|
||||
/**
|
||||
* Server-specific method factory
|
||||
*/
|
||||
protected var _factory:Option[RequestMethodFactory] = None
|
||||
|
||||
/**
|
||||
* Handles all servlet requests
|
||||
*/
|
||||
protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder:(()=>tAsyncRequestContext)=>RequestMethod) =
|
||||
{
|
||||
def suspend:tAsyncRequestContext =
|
||||
{
|
||||
//
|
||||
// set to right now, which is effectively "already expired"
|
||||
//
|
||||
val gmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
|
||||
gmt.setTimeZone(TimeZone.getTimeZone("GMT"))
|
||||
|
||||
response.setHeader("Expires", gmt.format(new Date))
|
||||
response.setHeader("Cache-Control", "no-cache, must-revalidate")
|
||||
|
||||
//
|
||||
// no keep-alive?
|
||||
//
|
||||
if (ConnectionClose) response.setHeader("Connection","close")
|
||||
|
||||
//
|
||||
// suspend the request
|
||||
// TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs
|
||||
//
|
||||
request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext]
|
||||
}
|
||||
|
||||
//
|
||||
// shoot the message to the root endpoint for processing
|
||||
// IMPORTANT: the suspend method is invoked on the server thread not in the actor
|
||||
//
|
||||
val method = builder(suspend _)
|
||||
if (method.go) {_root ! method}
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// HttpServlet API
|
||||
//
|
||||
|
||||
override def init(config: ServletConfig) =
|
||||
{
|
||||
super.init(config)
|
||||
|
||||
val context = config.getServletContext
|
||||
val server = context.getServerInfo
|
||||
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
|
||||
|
||||
log.info("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor)
|
||||
|
||||
(major, minor) match {
|
||||
|
||||
case (3,0) => {
|
||||
log.info("Supporting Java asynchronous contexts.")
|
||||
}
|
||||
|
||||
case _ if (server.toLowerCase startsWith JettyServer) => {
|
||||
|
||||
log.info("Supporting Jetty asynchronous continuations.")
|
||||
_factory = Some(JettyContinuationMethodFactory)
|
||||
}
|
||||
|
||||
case _ => {
|
||||
log.error("No asynchronous request handling can be supported.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Delete)
|
||||
protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Get)
|
||||
protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Head)
|
||||
protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Options)
|
||||
protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Post)
|
||||
protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Put)
|
||||
protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response) (_factory.get.Trace)
|
||||
}
|
||||
|
||||
object AkkaHttpServlet
|
||||
{
|
||||
import akka.config.Config._
|
||||
|
||||
final val JettyServer = "jetty"
|
||||
final val TimeoutAttribute = "timeout"
|
||||
|
||||
val ConnectionClose = config.getBool("akka.rest.connection-close", true)
|
||||
val RootActorBuiltin = config.getBool("akka.rest.root-actor-builtin", true)
|
||||
val RootActorID = config.getString("akka.rest.root-actor-id", "_httproot")
|
||||
val DefaultTimeout = config.getLong("akka.rest.timeout", 1000)
|
||||
val ExpiredHeaderName = config.getString("akka.rest.expired-header-name", "Async-Timeout")
|
||||
val ExpiredHeaderValue = config.getString("akka.rest.expired-header-value", "expired")
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,109 +0,0 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
import akka.util.Logging
|
||||
import javax.servlet.http.{HttpServletResponse, HttpServlet}
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
class AsyncHttpServlet extends HttpServlet with Logging
|
||||
{
|
||||
import java.util. {Date, TimeZone}
|
||||
import java.text.SimpleDateFormat
|
||||
import javax.servlet.ServletConfig
|
||||
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
|
||||
import akka.actor.ActorRegistry
|
||||
import Types._
|
||||
|
||||
//
|
||||
// the root endpoint for this servlet will have been booted already
|
||||
// use the system property to find out the actor id and cache him
|
||||
// TODO: currently this is hardcoded but really use a property
|
||||
//
|
||||
protected val _root = ActorRegistry.actorsFor("DefaultGridRoot").head
|
||||
|
||||
/**
|
||||
* Handles the HTTP request method on the servlet, suspends the connection and sends the asynchronous context
|
||||
* along to the root endpoint in a SuspendedRequest message
|
||||
*/
|
||||
protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder: (()=>Option[tAsyncContext]) => SuspendedRequest) =
|
||||
{
|
||||
def suspend:Option[tAsyncContext] =
|
||||
{
|
||||
//
|
||||
// set to effectively "already expired"
|
||||
//
|
||||
val gmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
|
||||
gmt.setTimeZone(TimeZone.getTimeZone("GMT"))
|
||||
|
||||
response.setHeader("Expires", gmt.format(new Date))
|
||||
response.setHeader("Cache-Control", "no-cache, must-revalidate")
|
||||
response.setHeader("Connection","close")
|
||||
|
||||
Some(request.asInstanceOf[tAsyncRequest].startAsync)
|
||||
}
|
||||
|
||||
//
|
||||
// shoot the message to the root endpoint for processing
|
||||
// IMPORTANT: the suspend method is invoked on the jetty thread not in the actor
|
||||
//
|
||||
val msg = builder(suspend _)
|
||||
if (msg.context ne None) {_root ! msg}
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses can choose to have the servlet listen to the async context events
|
||||
* @return A type of either AsyncListener or ContinuationListener
|
||||
*/
|
||||
def hook:Option[AnyRef] = None
|
||||
|
||||
|
||||
//
|
||||
// HttpServlet API
|
||||
//
|
||||
|
||||
final val Jetty7Server = "Jetty(7"
|
||||
|
||||
override def init(config: ServletConfig) =
|
||||
{
|
||||
super.init(config)
|
||||
|
||||
val context = config.getServletContext
|
||||
val server = context.getServerInfo
|
||||
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
|
||||
|
||||
log.debug("Initializing Akka HTTP on "+server+" with Servlet API "+major+"."+minor)
|
||||
|
||||
(major, minor) match {
|
||||
|
||||
case (3,0) => {
|
||||
log.debug("Supporting Java asynchronous contexts.")
|
||||
}
|
||||
|
||||
case (2,5) if (server startsWith Jetty7Server) => {
|
||||
log.debug("Supporting Jetty asynchronous continuations.")
|
||||
|
||||
}
|
||||
|
||||
case _ => {
|
||||
log.error("No asynchronous request handling can be supported.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Delete(f, hook _))
|
||||
protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Get(f, hook _))
|
||||
protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Head(f, hook _))
|
||||
protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Options(f, hook _))
|
||||
protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Post(f, hook _))
|
||||
protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Put(f, hook _))
|
||||
protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)((f:(()=>Option[tAsyncContext])) => Trace(f, hook _))
|
||||
}
|
||||
|
||||
|
|
@ -55,9 +55,9 @@ trait Endpoint
|
|||
//
|
||||
// dispatch the suspended requests
|
||||
//
|
||||
case msg if msg.isInstanceOf[SuspendedRequest] =>
|
||||
case msg if msg.isInstanceOf[RequestMethod] =>
|
||||
{
|
||||
val req = msg.asInstanceOf[SuspendedRequest]
|
||||
val req = msg.asInstanceOf[RequestMethod]
|
||||
val uri = req.request.getRequestURI
|
||||
val endpoints = _attachments.filter {_._1(uri)}
|
||||
|
||||
|
|
@ -78,7 +78,7 @@ trait Endpoint
|
|||
/**
|
||||
* no endpoint available - completes the request with a 404
|
||||
*/
|
||||
protected def _na(uri: String, req: SuspendedRequest) =
|
||||
protected def _na(uri: String, req: RequestMethod) =
|
||||
{
|
||||
req.NotFound("No endpoint available for [" + uri + "]")
|
||||
log.debug("No endpoint available for [" + uri + "]")
|
||||
|
|
@ -89,6 +89,7 @@ trait Endpoint
|
|||
class RootEndpoint extends Actor with Endpoint
|
||||
{
|
||||
import Endpoint._
|
||||
import AkkaHttpServlet._
|
||||
|
||||
final val Root = "/"
|
||||
|
||||
|
|
@ -97,6 +98,11 @@ class RootEndpoint extends Actor with Endpoint
|
|||
//
|
||||
self.dispatcher = Endpoint.Dispatcher
|
||||
|
||||
//
|
||||
// adopt the configured id
|
||||
//
|
||||
if (RootActorBuiltin) self.id = RootActorID
|
||||
|
||||
override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
|
||||
|
||||
def recv: Receive =
|
||||
|
|
@ -130,5 +136,5 @@ object Endpoint
|
|||
type Provider = Function[String, ActorRef]
|
||||
|
||||
case class Attach(hook: Hook, provider: Provider)
|
||||
case class NoneAvailable(uri: String, req: SuspendedRequest)
|
||||
case class NoneAvailable(uri: String, req: RequestMethod)
|
||||
}
|
||||
|
|
|
|||
140
akka-http/src/main/scala/JettyContinuation.scala
Normal file
140
akka-http/src/main/scala/JettyContinuation.scala
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
import org.eclipse.jetty.server._
|
||||
import org.eclipse.jetty.continuation._
|
||||
import Types._
|
||||
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait JettyContinuation extends ContinuationListener with akka.util.Logging
|
||||
{
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
import AkkaHttpServlet._
|
||||
|
||||
val builder:()=>tAsyncRequestContext
|
||||
val context:Option[tAsyncRequestContext] = Some(builder())
|
||||
def go = {_continuation.isDefined}
|
||||
|
||||
protected val _continuation:Option[AsyncContinuation] = {
|
||||
|
||||
val continuation = context.get.asInstanceOf[AsyncContinuation]
|
||||
|
||||
(continuation.isInitial,
|
||||
continuation.isSuspended,
|
||||
continuation.isExpired) match {
|
||||
|
||||
//
|
||||
// the fresh continuation (coming through getAsyncContinuation)
|
||||
//
|
||||
case (true, false, false) => {
|
||||
|
||||
continuation.setTimeout(DefaultTimeout)
|
||||
|
||||
//callback() foreach {listener => continuation.addContinuationListener(listener.asInstanceOf[ContinuationListener])}
|
||||
continuation.addContinuationListener(this)
|
||||
continuation.suspend
|
||||
|
||||
Some(continuation)
|
||||
}
|
||||
//
|
||||
// the fresh continuation (coming through startAsync instead)
|
||||
//
|
||||
case (true, true, false) => {
|
||||
|
||||
continuation.setTimeout(DefaultTimeout)
|
||||
|
||||
//callback() foreach {listener => continuation.addContinuationListener(listener.asInstanceOf[ContinuationListener])}
|
||||
continuation.addContinuationListener(this)
|
||||
|
||||
Some(continuation)
|
||||
}
|
||||
//
|
||||
// the timeout was reset and the continuation was resumed
|
||||
// need to update the timeout and resuspend
|
||||
// very important to clear the context so the request is not rebroadcast to the endpoint
|
||||
//
|
||||
case (false, false, false) => {
|
||||
|
||||
continuation.setTimeout(continuation.getAttribute(TimeoutAttribute).asInstanceOf[Long])
|
||||
continuation.suspend
|
||||
continuation.removeAttribute(TimeoutAttribute)
|
||||
|
||||
None
|
||||
}
|
||||
//
|
||||
// we don't actually expect to get this one here since the listener will finish him off
|
||||
//
|
||||
case (_, _, true) => {
|
||||
|
||||
None
|
||||
}
|
||||
//
|
||||
// snuh?
|
||||
//
|
||||
case _ => {
|
||||
|
||||
continuation.cancel
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def suspended:Boolean =
|
||||
{
|
||||
_continuation match {
|
||||
|
||||
case Some(continuation) => (continuation.isSuspended || (continuation.getAttribute(TimeoutAttribute) != null))
|
||||
case None => {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def timeout(ms:Long):Boolean =
|
||||
{
|
||||
_continuation match {
|
||||
|
||||
case Some(continuation) => {
|
||||
|
||||
continuation.setAttribute(TimeoutAttribute, ms)
|
||||
continuation.resume
|
||||
true
|
||||
}
|
||||
|
||||
case None => false
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// ContinuationListener
|
||||
//
|
||||
|
||||
def onComplete(c:Continuation) = {}
|
||||
def onTimeout(c:Continuation) =
|
||||
{
|
||||
c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue)
|
||||
c.complete
|
||||
}
|
||||
}
|
||||
|
||||
object JettyContinuationMethodFactory extends RequestMethodFactory
|
||||
{
|
||||
def Delete(f:(()=>tAsyncRequestContext)):RequestMethod = {new Delete(f) with JettyContinuation}
|
||||
def Get(f:(()=>tAsyncRequestContext)):RequestMethod = {new Get(f) with JettyContinuation}
|
||||
def Head(f:(()=>tAsyncRequestContext)):RequestMethod = {new Head(f) with JettyContinuation}
|
||||
def Options(f:(()=>tAsyncRequestContext)):RequestMethod = {new Options(f) with JettyContinuation}
|
||||
def Post(f:(()=>tAsyncRequestContext)):RequestMethod = {new Post(f) with JettyContinuation}
|
||||
def Put(f:(()=>tAsyncRequestContext)):RequestMethod = {new Put(f) with JettyContinuation}
|
||||
def Trace(f:(()=>tAsyncRequestContext)):RequestMethod = {new Trace(f) with JettyContinuation}
|
||||
}
|
||||
|
||||
|
||||
186
akka-http/src/main/scala/RequestMethod.scala
Normal file
186
akka-http/src/main/scala/RequestMethod.scala
Normal file
|
|
@ -0,0 +1,186 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
|
||||
import akka.util.Logging
|
||||
import Types._
|
||||
|
||||
|
||||
/**
|
||||
* Basic description of the suspended async http request.
|
||||
* Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations)
|
||||
*
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait RequestMethod extends Logging
|
||||
{
|
||||
import java.io.IOException
|
||||
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
|
||||
|
||||
//
|
||||
// required implementations
|
||||
//
|
||||
|
||||
val builder:()=>tAsyncRequestContext
|
||||
|
||||
/**
|
||||
* Provides a general type for the underlying context
|
||||
*
|
||||
* @return a completable request context
|
||||
*/
|
||||
val context:Option[tAsyncRequestContext]
|
||||
def go:Boolean
|
||||
|
||||
/**
|
||||
* Updates (resets) the timeout
|
||||
*
|
||||
* @return true if updated, false if not supported
|
||||
*/
|
||||
def timeout(ms:Long):Boolean
|
||||
|
||||
/**
|
||||
* Status of the suspension
|
||||
*/
|
||||
def suspended:Boolean
|
||||
|
||||
//
|
||||
// convenience funcs
|
||||
//
|
||||
|
||||
def request = context.get.getRequest.asInstanceOf[HttpServletRequest]
|
||||
def response = context.get.getResponse.asInstanceOf[HttpServletResponse]
|
||||
|
||||
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
|
||||
{
|
||||
request.getHeader(name) match {
|
||||
|
||||
case null => default(null)
|
||||
case s => s
|
||||
}
|
||||
}
|
||||
|
||||
def getParameterOrElse(name: String, default: Function[Any, String]): String =
|
||||
{
|
||||
request.getParameter(name) match {
|
||||
|
||||
case null => default(null)
|
||||
case s => s
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def complete(status: Int, body: String): Boolean = complete(status, body, List[Tuple2[String, String]]())
|
||||
|
||||
def complete(status: Int, body: String, headers: List[Tuple2[String, String]]): Boolean =
|
||||
{
|
||||
var ok = false
|
||||
context match {
|
||||
|
||||
case Some(pipe) => {
|
||||
try
|
||||
{
|
||||
if (!suspended)
|
||||
{
|
||||
log.warning("Attempt to complete an expired connection.")
|
||||
}
|
||||
else
|
||||
{
|
||||
response.setStatus(status)
|
||||
headers foreach {h => response.setHeader(h._1, h._2)}
|
||||
response.getWriter.write(body)
|
||||
response.getWriter.close
|
||||
response.flushBuffer
|
||||
pipe.complete
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
case io:IOException => log.error(io, "Failed to write data to connection on resume - the client probably disconnected")
|
||||
}
|
||||
}
|
||||
|
||||
case None => {
|
||||
log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")")
|
||||
}
|
||||
}
|
||||
ok
|
||||
}
|
||||
|
||||
def complete(t: Throwable): Unit =
|
||||
{
|
||||
var status = 0
|
||||
context match {
|
||||
|
||||
case Some(pipe) => {
|
||||
try
|
||||
{
|
||||
if (!suspended)
|
||||
{
|
||||
log.warning("Attempt to complete an expired connection.")
|
||||
}
|
||||
else
|
||||
{
|
||||
status = HttpServletResponse.SC_INTERNAL_SERVER_ERROR
|
||||
response.sendError(status, "Failed to write data to connection on resume")
|
||||
pipe.complete
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
case io:IOException => log.error(io, "Request completed with internal error.")
|
||||
}
|
||||
finally
|
||||
{
|
||||
log.error(t, "Request completed with internal error.")
|
||||
}
|
||||
}
|
||||
|
||||
case None => {
|
||||
log.error(t, "Attempt to complete request with no context")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body)
|
||||
def OK(body: String, headers:List[Tuple2[String,String]]): Boolean = complete(HttpServletResponse.SC_OK, body, headers)
|
||||
def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body)
|
||||
def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body)
|
||||
def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body)
|
||||
def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body)
|
||||
def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body)
|
||||
def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body)
|
||||
def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body)
|
||||
def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body)
|
||||
def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body)
|
||||
def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body)
|
||||
def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body)
|
||||
def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body)
|
||||
def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body)
|
||||
def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString)))
|
||||
}
|
||||
|
||||
abstract class Delete(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
|
||||
abstract class Get(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
|
||||
abstract class Head(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
|
||||
abstract class Options(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
|
||||
abstract class Post(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
|
||||
abstract class Put(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
|
||||
abstract class Trace(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
|
||||
|
||||
trait RequestMethodFactory
|
||||
{
|
||||
def Delete(f:(()=>tAsyncRequestContext)):RequestMethod
|
||||
def Get(f:(()=>tAsyncRequestContext)):RequestMethod
|
||||
def Head(f:(()=>tAsyncRequestContext)):RequestMethod
|
||||
def Options(f:(()=>tAsyncRequestContext)):RequestMethod
|
||||
def Post(f:(()=>tAsyncRequestContext)):RequestMethod
|
||||
def Put(f:(()=>tAsyncRequestContext)):RequestMethod
|
||||
def Trace(f:(()=>tAsyncRequestContext)):RequestMethod
|
||||
}
|
||||
|
|
@ -1,252 +0,0 @@
|
|||
/**
|
||||
* Copyright 2010 Autodesk, Inc. All rights reserved.
|
||||
* Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.
|
||||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package akka.http
|
||||
|
||||
|
||||
import akka.util.Logging
|
||||
import Types._
|
||||
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait SuspendedRequest extends Logging
|
||||
{
|
||||
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
|
||||
import org.eclipse.jetty.server._
|
||||
import org.eclipse.jetty.continuation._
|
||||
|
||||
final val Timeout = "timeout"
|
||||
final val DefaultTimeout = 30000
|
||||
|
||||
var context: Option[tAsyncContext] = None
|
||||
|
||||
def init(suspend:()=>Option[tAsyncContext], callback:()=>Option[AnyRef]) =
|
||||
{
|
||||
suspend() match
|
||||
{
|
||||
case (Some(continuation)) =>
|
||||
{
|
||||
context = Some(continuation)
|
||||
val ac = continuation.asInstanceOf[AsyncContinuation]
|
||||
|
||||
(ac.isInitial, ac.isSuspended, ac.isExpired) match
|
||||
{
|
||||
//
|
||||
// the fresh continuation
|
||||
//
|
||||
case (true, false, false) =>
|
||||
{
|
||||
ac.setTimeout(DefaultTimeout)
|
||||
|
||||
//callback() foreach {listener => ac.addContinuationListener(listener)}
|
||||
//ac.addContinuationListener(this)
|
||||
ac.suspend
|
||||
}
|
||||
//
|
||||
// the timeout was reset and the continuation was resumed
|
||||
// need to update the timeout and resuspend
|
||||
// very important to clear the context so the request is not rebroadcast to the endpoint
|
||||
//
|
||||
case (false, false, false) =>
|
||||
{
|
||||
ac.setTimeout(ac.getAttribute(Timeout).asInstanceOf[Long])
|
||||
ac.suspend
|
||||
ac.removeAttribute(Timeout)
|
||||
|
||||
context = None
|
||||
log.debug("Updating and re-suspending request. TIMEOUT ("+ac.getTimeout+" ms)")
|
||||
}
|
||||
//
|
||||
// we don't actually expect to get this one here since the listener will finish him off
|
||||
//
|
||||
case (_, _, true) =>
|
||||
{
|
||||
response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT)
|
||||
context = None
|
||||
log.warning("Expired request arrived here unexpectedly. REQUEST ("+continuation.toString+")")
|
||||
}
|
||||
case unknown =>
|
||||
{
|
||||
log.error("Unexpected continuation state detected - cancelling")
|
||||
ac.cancel
|
||||
context = None
|
||||
}
|
||||
}
|
||||
}
|
||||
case _ =>
|
||||
{
|
||||
log.error("Cannot initialize request without an asynchronous context.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def request = context.get.getRequest.asInstanceOf[HttpServletRequest]
|
||||
def response = context.get.getResponse.asInstanceOf[HttpServletResponse]
|
||||
def suspended =
|
||||
{
|
||||
context match
|
||||
{
|
||||
case Some(continuation) =>
|
||||
{
|
||||
val ac = continuation.asInstanceOf[AsyncContinuation]
|
||||
(ac.isSuspended || (ac.getAttribute(Timeout) != null))
|
||||
}
|
||||
case None => false
|
||||
}
|
||||
}
|
||||
|
||||
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
|
||||
{
|
||||
request.getHeader(name) match
|
||||
{
|
||||
case null => default(null)
|
||||
case s => s
|
||||
}
|
||||
}
|
||||
|
||||
def getParameterOrElse(name: String, default: Function[Any, String]): String =
|
||||
{
|
||||
request.getParameter(name) match
|
||||
{
|
||||
case null => default(null)
|
||||
case s => s
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allow for an updatable timeout
|
||||
*/
|
||||
def timeout(ms:Long):Unit =
|
||||
{
|
||||
context match
|
||||
{
|
||||
case Some(continuation) =>
|
||||
{
|
||||
continuation.asInstanceOf[AsyncContinuation].setAttribute(Timeout, ms)
|
||||
continuation.asInstanceOf[AsyncContinuation].resume
|
||||
}
|
||||
case None => log.error("Cannot update the timeout on an unsuspended request")
|
||||
}
|
||||
}
|
||||
|
||||
def complete(status: Int, body: String): Boolean = complete(status, body, List[Tuple2[String, String]]())
|
||||
|
||||
def complete(status: Int, body: String, headers: List[Tuple2[String, String]]): Boolean =
|
||||
{
|
||||
var ok = false
|
||||
context match
|
||||
{
|
||||
case Some(pipe) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!suspended)
|
||||
{
|
||||
log.warning("Attempt to complete an expired connection.")
|
||||
}
|
||||
else
|
||||
{
|
||||
response.setStatus(status)
|
||||
headers foreach {h => response.setHeader(h._1, h._2)}
|
||||
response.getWriter.write(body)
|
||||
response.getWriter.close
|
||||
response.flushBuffer
|
||||
pipe.complete
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
case ex => log.error(ex, "Failed to write data to connection on resume - the client probably disconnected")
|
||||
}
|
||||
finally
|
||||
{
|
||||
context = None
|
||||
}
|
||||
}
|
||||
case None =>
|
||||
{
|
||||
log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")")
|
||||
}
|
||||
}
|
||||
ok
|
||||
}
|
||||
|
||||
def complete(t: Throwable): Unit =
|
||||
{
|
||||
var status = 0
|
||||
context match
|
||||
{
|
||||
case Some(pipe) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!suspended)
|
||||
{
|
||||
log.warning("Attempt to complete an expired connection.")
|
||||
}
|
||||
else
|
||||
{
|
||||
status = HttpServletResponse.SC_INTERNAL_SERVER_ERROR
|
||||
response.sendError(status, "Failed to write data to connection on resume")
|
||||
pipe.complete
|
||||
}
|
||||
}
|
||||
catch
|
||||
{
|
||||
case ex => log.error(ex, "Request completed with internal error.")
|
||||
}
|
||||
finally
|
||||
{
|
||||
context = None
|
||||
log.error(t, "Request completed with internal error.")
|
||||
}
|
||||
}
|
||||
case None =>
|
||||
{
|
||||
log.error(t, "Attempt to complete request with no context")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def onComplete(c:Continuation) = {}
|
||||
def onTimeout(c:Continuation) =
|
||||
{
|
||||
c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader("Suspend","Timeout")
|
||||
c.complete
|
||||
log.debug("Request expired. CONTEXT ("+c+")")
|
||||
}
|
||||
|
||||
|
||||
def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body)
|
||||
def OK(body: String, headers:List[Tuple2[String,String]]): Boolean = complete(HttpServletResponse.SC_OK, body, headers)
|
||||
def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body)
|
||||
def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body)
|
||||
def NotModified(body:String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body)
|
||||
def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body)
|
||||
def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body)
|
||||
def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body)
|
||||
def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body)
|
||||
def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body)
|
||||
def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body)
|
||||
def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body)
|
||||
def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body)
|
||||
def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body)
|
||||
def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body)
|
||||
def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString)))
|
||||
}
|
||||
|
||||
|
||||
case class Delete(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
|
||||
case class Get(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
|
||||
case class Head(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
|
||||
case class Options(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
|
||||
case class Post(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
|
||||
case class Put(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
|
||||
case class Trace(f:(()=>Option[tAsyncContext]), g:(()=>Option[AnyRef])) extends SuspendedRequest {init(f, g)}
|
||||
|
||||
|
|
@ -14,37 +14,18 @@ package akka.http
|
|||
*/
|
||||
object Types
|
||||
{
|
||||
import javax.servlet. {ServletContext, ServletRequest, ServletResponse}
|
||||
import javax.servlet. {ServletRequest, ServletResponse}
|
||||
|
||||
type tAsyncRequest = {
|
||||
def startAsync:tAsyncContext
|
||||
def startAsync:tAsyncRequestContext
|
||||
}
|
||||
|
||||
type tAsyncContext = {
|
||||
/**
|
||||
* Used to match both AsyncContext and AsyncContinuation in order to complete the request
|
||||
*/
|
||||
type tAsyncRequestContext = {
|
||||
def complete:Unit
|
||||
def dispatch:Unit
|
||||
def dispatch(s:String):Unit
|
||||
def dispatch(c:ServletContext, s:String)
|
||||
def getRequest:ServletRequest
|
||||
def getResponse:ServletResponse
|
||||
def hasOriginalRequestAndResponse:Boolean
|
||||
def setTimeout(ms:Long):Unit
|
||||
def start(r:Runnable):Unit
|
||||
}
|
||||
|
||||
type tContinuation = {
|
||||
def complete:Unit
|
||||
def isExpired:Boolean
|
||||
def isInitial:Boolean
|
||||
def isResumed:Boolean
|
||||
def isSuspended:Boolean
|
||||
def resume:Unit
|
||||
def suspend:Unit
|
||||
def undispatch:Unit
|
||||
}
|
||||
|
||||
type tContinuationListener = {
|
||||
def onComplete(c:tContinuation)
|
||||
def onTimeout(c:tContinuation)
|
||||
}
|
||||
}
|
||||
|
|
@ -4,7 +4,7 @@
|
|||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package sample.mist
|
||||
package sample.rest.scala
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
|
|
@ -19,27 +19,16 @@ class Boot {
|
|||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
//
|
||||
// in this particular case, just boot the built-in default root endpoint
|
||||
//
|
||||
Supervise(
|
||||
actorOf[ServiceRoot],
|
||||
actorOf[RootEndpoint],
|
||||
Permanent) ::
|
||||
Supervise(
|
||||
actorOf[SimpleService],
|
||||
actorOf[SimpleAkkaAsyncHttpService],
|
||||
Permanent)
|
||||
:: Nil))
|
||||
factory.newInstance.start
|
||||
}
|
||||
|
||||
|
||||
class ServiceRoot extends RootEndpoint
|
||||
{
|
||||
//
|
||||
// use the configurable dispatcher
|
||||
//
|
||||
self.dispatcher = Endpoint.Dispatcher
|
||||
|
||||
//
|
||||
// TODO: make this a config prop
|
||||
//
|
||||
self.id = "DefaultRootEndpoint"
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
|
||||
*/
|
||||
|
||||
package sample.mist
|
||||
package sample.rest.scala
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
|
|
@ -18,7 +18,7 @@ import akka.http._
|
|||
*
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
class SimpleService extends Actor with Endpoint
|
||||
class SimpleAkkaAsyncHttpService extends Actor with Endpoint
|
||||
{
|
||||
final val ServiceRoot = "/simple/"
|
||||
final val ProvideSameActor = ServiceRoot + "same"
|
||||
|
|
@ -147,9 +147,9 @@ class BoringActor extends Actor
|
|||
}
|
||||
}
|
||||
|
||||
case other if other.isInstanceOf[SuspendedRequest] =>
|
||||
case other if other.isInstanceOf[RequestMethod] =>
|
||||
{
|
||||
other.asInstanceOf[SuspendedRequest].NotAllowed("Invalid method for this endpoint")
|
||||
other.asInstanceOf[RequestMethod].NotAllowed("Invalid method for this endpoint")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -22,7 +22,7 @@ import org.atmosphere.util.XSSHtmlFilter
|
|||
import org.atmosphere.cpr.{Broadcaster, BroadcastFilter}
|
||||
import org.atmosphere.jersey.Broadcastable
|
||||
|
||||
class Boot {
|
||||
class BootPrev {
|
||||
val factory = SupervisorFactory(
|
||||
SupervisorConfig(
|
||||
OneForOneStrategy(List(classOf[Exception]), 3, 100),
|
||||
|
|
|
|||
|
|
@ -99,6 +99,13 @@ akka {
|
|||
}
|
||||
# maxInactiveActivity = 60000 # Atmosphere CometSupport maxInactiveActivity
|
||||
|
||||
connection-close = true # toggles the addition of the "Connection" response header with a "close" value
|
||||
root-actor-id = "_httproot" # the id of the actor to use as the root endpoint
|
||||
root-actor-builtin = true # toggles the use of the built-in root endpoint base class
|
||||
timeout = 1000 # the default timeout for all async requests (in ms)
|
||||
expired-header-name = "Async-Timeout" # the name of the response header to use when an async request expires
|
||||
expired-header-value = "expired" # the value of the response header to use when an async request expires
|
||||
|
||||
# Uncomment if you are using the KerberosAuthenticationActor
|
||||
# kerberos {
|
||||
# servicePrincipal = "HTTP/localhost@EXAMPLE.COM"
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@
|
|||
<New id="AkkaRestHandler" class="org.eclipse.jetty.servlet.ServletContextHandler">
|
||||
<Set name="contextPath">/</Set>
|
||||
<Call name="addServlet">
|
||||
<Arg>akka.comet.AkkaServlet</Arg>
|
||||
<Arg>akka.http.AkkaHttpServlet</Arg>
|
||||
<Arg>/*</Arg>
|
||||
</Call>
|
||||
</New>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue