Minor code tweaks, removing Atmosphere, awaiting some tests then ready for master

This commit is contained in:
Viktor Klang 2010-11-22 13:21:55 +01:00
parent dcbdfcc67b
commit e802b336a1
8 changed files with 148 additions and 235 deletions

View file

@ -36,17 +36,14 @@ class AkkaHttpServlet extends HttpServlet with Logging
/**
* Handles all servlet requests
*/
protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder:(()=>tAsyncRequestContext)=>RequestMethod) =
protected def _do(request:HttpServletRequest, response:HttpServletResponse)(builder:(() => tAsyncRequestContext) => RequestMethod) =
{
def suspend:tAsyncRequestContext =
{
def suspend:tAsyncRequestContext = {
//
// set to right now, which is effectively "already expired"
//
val gmt = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
gmt.setTimeZone(TimeZone.getTimeZone("GMT"))
response.setHeader("Expires", gmt.format(new Date))
response.setDateHeader("Expires", System.currentTimeMillis)
response.setHeader("Cache-Control", "no-cache, must-revalidate")
//
@ -66,7 +63,7 @@ class AkkaHttpServlet extends HttpServlet with Logging
// IMPORTANT: the suspend method is invoked on the server thread not in the actor
//
val method = builder(suspend _)
if (method.go) {_root ! method}
if (method.go) _root ! method
}
@ -103,13 +100,13 @@ class AkkaHttpServlet extends HttpServlet with Logging
}
}
protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Delete)
protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Get)
protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Head)
protected override def doDelete(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Delete)
protected override def doGet(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Get)
protected override def doHead(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Head)
protected override def doOptions(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Options)
protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Post)
protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Put)
protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response) (_factory.get.Trace)
protected override def doPost(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Post)
protected override def doPut(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Put)
protected override def doTrace(request: HttpServletRequest, response: HttpServletResponse) = _do(request, response)(_factory.get.Trace)
}
object AkkaHttpServlet

View file

@ -10,6 +10,7 @@ import java.io.File
import akka.actor.BootableActorLoaderService
import akka.util.{Bootable, Logging}
//import akka.comet.AkkaServlet
import org.eclipse.jetty.xml.XmlConfiguration

View file

@ -18,9 +18,6 @@ trait Endpoint
import Endpoint._
type Hook = Function[String, Boolean]
type Provider = Function[String, ActorRef]
/**
* A convenience method to get the actor ref
*/
@ -35,54 +32,44 @@ trait Endpoint
/**
*
*/
protected def _attach(hook:Hook, provider:Provider) =
{
_attachments = (hook, provider) :: _attachments
}
protected def _attach(hook:Hook, provider:Provider) = _attachments = (hook, provider) :: _attachments
/**
* Message handling common to all endpoints, must be chained
*/
protected def _recv: Receive =
{
//
// add the endpoint - the if the uri hook matches,
// the message will be sent to the actor returned by the provider func
//
case Attach(hook, provider) => _attach(hook, provider)
protected def _recv: Receive = {
//
// add the endpoint - the if the uri hook matches,
// the message will be sent to the actor returned by the provider func
//
case Attach(hook, provider) => _attach(hook, provider)
//
// dispatch the suspended requests
//
case msg if msg.isInstanceOf[RequestMethod] => {
val req = msg.asInstanceOf[RequestMethod]
val uri = req.request.getRequestURI
val endpoints = _attachments.filter { _._1(uri) }
//
// dispatch the suspended requests
//
case msg if msg.isInstanceOf[RequestMethod] =>
{
val req = msg.asInstanceOf[RequestMethod]
val uri = req.request.getRequestURI
val endpoints = _attachments.filter {_._1(uri)}
if (endpoints.size > 0)
endpoints.foreach {_._2(uri) ! req}
else
{
self.sender match
{
case Some(s) => s reply NoneAvailable(uri, req)
case None => _na(uri, req)
}
}
if (!endpoints.isEmpty)
endpoints.foreach { _._2(uri) ! req }
else {
self.sender match {
case Some(s) => s reply NoneAvailable(uri, req)
case None => _na(uri, req)
}
}
}
}
/**
* no endpoint available - completes the request with a 404
*/
protected def _na(uri: String, req: RequestMethod) =
{
req.NotFound("No endpoint available for [" + uri + "]")
log.debug("No endpoint available for [" + uri + "]")
}
protected def _na(uri: String, req: RequestMethod) = {
req.NotFound("No endpoint available for [" + uri + "]")
log.debug("No endpoint available for [" + uri + "]")
}
}
@ -105,19 +92,16 @@ class RootEndpoint extends Actor with Endpoint
override def preStart = _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
def recv: Receive =
{
case NoneAvailable(uri, req) => _na(uri, req)
case unknown =>
{
log.error("Unexpected message sent to root endpoint. [" + unknown.toString + "]")
}
}
def recv: Receive = {
case NoneAvailable(uri, req) => _na(uri, req)
case unknown =>
log.error("Unexpected message sent to root endpoint. [" + unknown.toString + "]")
}
/**
* Note that root is a little different, other endpoints should chain their own recv first
*/
def receive = {_recv orElse recv}
def receive = _recv orElse recv
}
@ -132,7 +116,7 @@ object Endpoint
*/
final val Dispatcher = Dispatchers.fromConfig("akka.rest.comet-dispatcher")
type Hook = Function[String, Boolean]
type Hook = Function[String, Boolean]
type Provider = Function[String, ActorRef]
case class Attach(hook: Hook, provider: Provider)

View file

@ -19,9 +19,9 @@ trait JettyContinuation extends ContinuationListener with akka.util.Logging
import javax.servlet.http.HttpServletResponse
import AkkaHttpServlet._
val builder:()=>tAsyncRequestContext
val context:Option[tAsyncRequestContext] = Some(builder())
def go = {_continuation.isDefined}
val builder:() => tAsyncRequestContext
val context: Option[tAsyncRequestContext] = Some(builder())
def go = _continuation.isDefined
protected val _continuation:Option[AsyncContinuation] = {
@ -31,36 +31,30 @@ trait JettyContinuation extends ContinuationListener with akka.util.Logging
continuation.isSuspended,
continuation.isExpired) match {
//
// the fresh continuation (coming through getAsyncContinuation)
//
//
// the fresh continuation (coming through getAsyncContinuation)
//
case (true, false, false) => {
continuation.setTimeout(DefaultTimeout)
//callback() foreach {listener => continuation.addContinuationListener(listener.asInstanceOf[ContinuationListener])}
continuation.addContinuationListener(this)
continuation.suspend
Some(continuation)
}
//
// the fresh continuation (coming through startAsync instead)
//
//
// the fresh continuation (coming through startAsync instead)
//
case (true, true, false) => {
continuation.setTimeout(DefaultTimeout)
//callback() foreach {listener => continuation.addContinuationListener(listener.asInstanceOf[ContinuationListener])}
continuation.addContinuationListener(this)
Some(continuation)
}
//
// the timeout was reset and the continuation was resumed
// need to update the timeout and resuspend
// very important to clear the context so the request is not rebroadcast to the endpoint
//
// the timeout was reset and the continuation was resumed
// need to update the timeout and resuspend
// very important to clear the context so the request is not rebroadcast to the endpoint
case (false, false, false) => {
continuation.setTimeout(continuation.getAttribute(TimeoutAttribute).asInstanceOf[Long])
@ -69,58 +63,41 @@ trait JettyContinuation extends ContinuationListener with akka.util.Logging
None
}
//
// we don't actually expect to get this one here since the listener will finish him off
//
//
// we don't actually expect to get this one here since the listener will finish him off
//
case (_, _, true) => {
None
}
//
// snuh?
//
//
// snuh?
//
case _ => {
continuation.cancel
None
}
}
}
def suspended:Boolean =
{
_continuation match {
case Some(continuation) => (continuation.isSuspended || (continuation.getAttribute(TimeoutAttribute) != null))
case None => {
false
}
}
def suspended:Boolean = _continuation match {
case None => false
case Some(continuation) => (continuation.isSuspended || (continuation.getAttribute(TimeoutAttribute) ne null))
}
def timeout(ms:Long):Boolean =
{
_continuation match {
case Some(continuation) => {
continuation.setAttribute(TimeoutAttribute, ms)
continuation.resume
true
}
case None => false
}
}
def timeout(ms:Long):Boolean = _continuation match {
case None => false
case Some(continuation) =>
continuation.setAttribute(TimeoutAttribute, ms)
continuation.resume
true
}
//
// ContinuationListener
//
def onComplete(c:Continuation) = {}
def onTimeout(c:Continuation) =
{
def onComplete(c: Continuation) = {}
def onTimeout(c: Continuation) = {
c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue)
c.complete
}
@ -128,13 +105,13 @@ trait JettyContinuation extends ContinuationListener with akka.util.Logging
object JettyContinuationMethodFactory extends RequestMethodFactory
{
def Delete(f:(()=>tAsyncRequestContext)):RequestMethod = {new Delete(f) with JettyContinuation}
def Get(f:(()=>tAsyncRequestContext)):RequestMethod = {new Get(f) with JettyContinuation}
def Head(f:(()=>tAsyncRequestContext)):RequestMethod = {new Head(f) with JettyContinuation}
def Options(f:(()=>tAsyncRequestContext)):RequestMethod = {new Options(f) with JettyContinuation}
def Post(f:(()=>tAsyncRequestContext)):RequestMethod = {new Post(f) with JettyContinuation}
def Put(f:(()=>tAsyncRequestContext)):RequestMethod = {new Put(f) with JettyContinuation}
def Trace(f:(()=>tAsyncRequestContext)):RequestMethod = {new Trace(f) with JettyContinuation}
def Delete(f: () => tAsyncRequestContext): RequestMethod = new Delete(f) with JettyContinuation
def Get(f: () => tAsyncRequestContext): RequestMethod = new Get(f) with JettyContinuation
def Head(f: () => tAsyncRequestContext): RequestMethod = new Head(f) with JettyContinuation
def Options(f: () => tAsyncRequestContext): RequestMethod = new Options(f) with JettyContinuation
def Post(f: () => tAsyncRequestContext): RequestMethod = new Post(f) with JettyContinuation
def Put(f: () => tAsyncRequestContext): RequestMethod = new Put(f) with JettyContinuation
def Trace(f: () => tAsyncRequestContext): RequestMethod = new Trace(f) with JettyContinuation
}

View file

@ -26,14 +26,14 @@ trait RequestMethod extends Logging
// required implementations
//
val builder:()=>tAsyncRequestContext
val builder:() => tAsyncRequestContext
/**
* Provides a general type for the underlying context
*
* @return a completable request context
*/
val context:Option[tAsyncRequestContext]
val context: Option[tAsyncRequestContext]
def go:Boolean
/**
@ -41,12 +41,12 @@ trait RequestMethod extends Logging
*
* @return true if updated, false if not supported
*/
def timeout(ms:Long):Boolean
def timeout(ms: Long): Boolean
/**
* Status of the suspension
*/
def suspended:Boolean
def suspended: Boolean
//
// convenience funcs
@ -56,94 +56,71 @@ trait RequestMethod extends Logging
def response = context.get.getResponse.asInstanceOf[HttpServletResponse]
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
{
request.getHeader(name) match {
case null => default(null)
case s => s
}
}
case s => s
}
def getParameterOrElse(name: String, default: Function[Any, String]): String =
{
request.getParameter(name) match {
case null => default(null)
case s => s
}
}
def complete(status: Int, body: String): Boolean = complete(status, body, List[Tuple2[String, String]]())
def complete(status: Int, body: String, headers: List[Tuple2[String, String]]): Boolean =
{
var ok = false
context match {
case Some(pipe) => {
try
{
if (!suspended)
{
try {
if (!suspended) {
log.warning("Attempt to complete an expired connection.")
false
}
else
{
else {
response.setStatus(status)
headers foreach {h => response.setHeader(h._1, h._2)}
response.getWriter.write(body)
response.getWriter.close
response.flushBuffer
pipe.complete
ok = true
true
}
} catch {
case io =>
log.error(io, "Failed to write data to connection on resume - the client probably disconnected")
false
}
catch
{
case io => log.error(io, "Failed to write data to connection on resume - the client probably disconnected")
}
}
}
case None => {
log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")")
}
case None =>
log.error("Attempt to complete request with no context. STATUS (" + status + ") BODY (" + body + ") HEADERS (" + headers + ")")
false
}
ok
}
def complete(t: Throwable): Unit =
{
var status = 0
def complete(t: Throwable) {
context match {
case Some(pipe) => {
try
{
if (!suspended)
{
try {
if (!suspended) {
log.warning("Attempt to complete an expired connection.")
}
else
{
status = HttpServletResponse.SC_INTERNAL_SERVER_ERROR
response.sendError(status, "Failed to write data to connection on resume")
else {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume")
pipe.complete
}
}
catch
{
catch {
case io:IOException => log.error(io, "Request completed with internal error.")
}
finally
{
finally {
log.error(t, "Request completed with internal error.")
}
}
case None => {
case None =>
log.error(t, "Attempt to complete request with no context")
}
}
}
@ -166,21 +143,21 @@ trait RequestMethod extends Logging
def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString)))
}
abstract class Delete(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
abstract class Get(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
abstract class Head(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
abstract class Options(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
abstract class Post(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
abstract class Put(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
abstract class Trace(f:(()=>tAsyncRequestContext)) extends RequestMethod {val builder = f}
abstract class Delete(val builder: () => tAsyncRequestContext) extends RequestMethod
abstract class Get(val builder: () => tAsyncRequestContext) extends RequestMethod
abstract class Head(val builder: () => tAsyncRequestContext) extends RequestMethod
abstract class Options(val builder: () => tAsyncRequestContext) extends RequestMethod
abstract class Post(val builder: () => tAsyncRequestContext) extends RequestMethod
abstract class Put(val builder: () => tAsyncRequestContext) extends RequestMethod
abstract class Trace(val builder: () => tAsyncRequestContext) extends RequestMethod
trait RequestMethodFactory
{
def Delete(f:(()=>tAsyncRequestContext)):RequestMethod
def Get(f:(()=>tAsyncRequestContext)):RequestMethod
def Head(f:(()=>tAsyncRequestContext)):RequestMethod
def Options(f:(()=>tAsyncRequestContext)):RequestMethod
def Post(f:(()=>tAsyncRequestContext)):RequestMethod
def Put(f:(()=>tAsyncRequestContext)):RequestMethod
def Trace(f:(()=>tAsyncRequestContext)):RequestMethod
def Delete(f: () => tAsyncRequestContext): RequestMethod
def Get(f: () => tAsyncRequestContext): RequestMethod
def Head(f: () => tAsyncRequestContext): RequestMethod
def Options(f: () => tAsyncRequestContext): RequestMethod
def Post(f: () => tAsyncRequestContext): RequestMethod
def Put(f: () => tAsyncRequestContext): RequestMethod
def Trace(f: () => tAsyncRequestContext): RequestMethod
}

View file

@ -18,63 +18,54 @@ trait Servlet30Context extends AsyncListener with akka.util.Logging
import javax.servlet.http.HttpServletResponse
import AkkaHttpServlet._
val builder:()=>tAsyncRequestContext
val context:Option[tAsyncRequestContext] = Some(builder())
def go = {context.isDefined}
val builder: () => tAsyncRequestContext
val context: Option[tAsyncRequestContext] = Some(builder())
def go = context.isDefined
protected val _ac:AsyncContext = {
protected val _ac: AsyncContext = {
val ac = context.get.asInstanceOf[AsyncContext]
ac.setTimeout(DefaultTimeout)
ac.addListener(this)
ac setTimeout DefaultTimeout
ac addListener this
ac
}
def suspended:Boolean = true
def suspended = true
def timeout(ms:Long):Boolean =
{
def timeout(ms:Long):Boolean = {
try {
_ac.setTimeout(ms)
_ac setTimeout ms
true
}
catch {
case ex:IllegalStateException => {
case ex:IllegalStateException =>
log.info("Cannot update timeout - already returned to container")
false
}
}
}
//
// AsyncListener
//
def onComplete(e:AsyncEvent) = {}
def onError(e:AsyncEvent) =
{
e.getThrowable match {
case null => log.warning("Error occured...")
case t => log.warning(t, "Error occured")
}
def onError(e:AsyncEvent) = e.getThrowable match {
case null => log.warning("Error occured...")
case t => log.warning(t, "Error occured")
}
def onStartAsync(e:AsyncEvent) = {}
def onTimeout(e:AsyncEvent) =
{
def onTimeout(e:AsyncEvent) = {
e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue)
e.getAsyncContext.complete
}
}
object Servlet30ContextMethodFactory extends RequestMethodFactory
{
def Delete(f:(()=>tAsyncRequestContext)):RequestMethod = {new Delete(f) with Servlet30Context}
def Get(f:(()=>tAsyncRequestContext)):RequestMethod = {new Get(f) with Servlet30Context}
def Head(f:(()=>tAsyncRequestContext)):RequestMethod = {new Head(f) with Servlet30Context}
def Options(f:(()=>tAsyncRequestContext)):RequestMethod = {new Options(f) with Servlet30Context}
def Post(f:(()=>tAsyncRequestContext)):RequestMethod = {new Post(f) with Servlet30Context}
def Put(f:(()=>tAsyncRequestContext)):RequestMethod = {new Put(f) with Servlet30Context}
def Trace(f:(()=>tAsyncRequestContext)):RequestMethod = {new Trace(f) with Servlet30Context}
object Servlet30ContextMethodFactory extends RequestMethodFactory {
def Delete(f: () => tAsyncRequestContext): RequestMethod = new Delete(f) with Servlet30Context
def Get(f: () => tAsyncRequestContext): RequestMethod = new Get(f) with Servlet30Context
def Head(f: () => tAsyncRequestContext): RequestMethod = new Head(f) with Servlet30Context
def Options(f: () => tAsyncRequestContext): RequestMethod = new Options(f) with Servlet30Context
def Post(f: () => tAsyncRequestContext): RequestMethod = new Post(f) with Servlet30Context
def Put(f: () => tAsyncRequestContext): RequestMethod = new Put(f) with Servlet30Context
def Trace(f: () => tAsyncRequestContext): RequestMethod = new Trace(f) with Servlet30Context
}

View file

@ -17,15 +17,15 @@ object Types
import javax.servlet. {ServletRequest, ServletResponse}
type tAsyncRequest = {
def startAsync:tAsyncRequestContext
def startAsync: tAsyncRequestContext
}
/**
* Used to match both AsyncContext and AsyncContinuation in order to complete the request
*/
type tAsyncRequestContext = {
def complete:Unit
def getRequest:ServletRequest
def getResponse:ServletResponse
def complete: Unit
def getRequest: ServletRequest
def getResponse: ServletResponse
}
}