Merge remote-tracking branch 'origin/master' into ticket-622

This commit is contained in:
Derek Williams 2011-03-30 22:07:13 -06:00
commit fdb76eb8d0

View file

@ -4,7 +4,7 @@
package akka.http
import akka.actor.{ActorRegistry, ActorRef, Actor}
import akka.actor.{ActorRef, Actor}
import akka.event.EventHandler
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
@ -17,8 +17,8 @@ import javax.servlet.Filter
object MistSettings {
import akka.config.Config._
final val JettyServer = "jetty"
final val TimeoutAttribute = "timeout"
val JettyServer = "jetty"
val TimeoutAttribute = "timeout"
val ConnectionClose = config.getBool("akka.http.connection-close", true)
val RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true)
@ -64,7 +64,7 @@ import Types._
*
*/
trait Mist {
import javax.servlet.{ServletContext}
import javax.servlet.ServletContext
import MistSettings._
/**
@ -84,28 +84,21 @@ trait Mist {
response: HttpServletResponse)
(builder: (() => tAsyncRequestContext) => RequestMethod) = {
def suspend: tAsyncRequestContext = {
//
// set to right now, which is effectively "already expired"
//
response.setDateHeader("Expires", System.currentTimeMillis)
response.setHeader("Cache-Control", "no-cache, must-revalidate")
//
// no keep-alive?
//
if (ConnectionClose) response.setHeader("Connection","close")
//
// suspend the request
// TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs
//
request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext]
}
//
// shoot the message to the root endpoint for processing
// IMPORTANT: the suspend method is invoked on the server thread not in the actor
//
val method = builder(suspend _)
if (method.go) _root ! method
}
@ -117,7 +110,6 @@ trait Mist {
def initMist(context: ServletContext) {
val server = context.getServerInfo
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
_factory = if (major >= 3) {
Some(Servlet30ContextMethodFactory)
} else if (server.toLowerCase startsWith JettyServer) {
@ -200,7 +192,7 @@ object Endpoint {
/**
* leverage the akka config to tweak the dispatcher for our endpoints
*/
final val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
type Hook = Function[String, Boolean]
type Provider = Function[String, ActorRef]
@ -236,25 +228,21 @@ trait Endpoint { this: Actor =>
* Message handling common to all endpoints, must be chained
*/
protected def handleHttpRequest: Receive = {
//
// add the endpoint - the if the uri hook matches,
// the message will be sent to the actor returned by the provider func
//
case Attach(hook, provider) => _attach(hook, provider)
//
// dispatch the suspended requests
//
case req: RequestMethod => {
val uri = req.request.getPathInfo
val endpoints = _attachments.filter { _._1(uri) }
if (!endpoints.isEmpty)
endpoints.foreach { _._2(uri) ! req }
if (!endpoints.isEmpty) endpoints.foreach { _._2(uri) ! req }
else {
self.sender match {
case Some(s) => s reply NoneAvailable(uri, req)
case None => _na(uri, req)
case None => _na(uri, req)
}
}
}
@ -275,23 +263,15 @@ class RootEndpoint extends Actor with Endpoint {
final val Root = "/"
//
// use the configurable dispatcher
//
self.dispatcher = Endpoint.Dispatcher
//
// adopt the configured id
//
if (RootActorBuiltin) self.id = RootActorID
override def preStart =
_attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
//TODO: Is this needed?
//override def postRestart =
// _attachments = Tuple2((uri: String) => {uri eq Root}, (uri: String) => this.actor) :: _attachments
def recv: Receive = {
case NoneAvailable(uri, req) => _na(uri, req)
case unknown => {}
@ -317,10 +297,7 @@ trait RequestMethod {
import java.io.IOException
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
//
// required implementations
//
val builder: () => tAsyncRequestContext
/**
@ -353,35 +330,31 @@ trait RequestMethod {
def getHeaderOrElse(name: String, default: Function[Any, String]): String =
request.getHeader(name) match {
case null => default(null)
case s => s
}
case s => s
}
def getParameterOrElse(name: String, default: Function[Any, String]): String =
request.getParameter(name) match {
case null => default(null)
case s => s
case s => s
}
def complete(status: Int, body: String): Boolean = complete(status, body, Headers())
def complete(status: Int, body: String, headers: Headers): Boolean =
rawComplete {
res => {
res.setStatus(status)
headers foreach {h => response.setHeader(h._1, h._2)}
res.getWriter.write(body)
res.getWriter.close
res.flushBuffer
}
rawComplete { res =>
res.setStatus(status)
headers foreach {h => response.setHeader(h._1, h._2)}
res.getWriter.write(body)
res.getWriter.close
res.flushBuffer
}
def rawComplete(completion: HttpServletResponse => Unit): Boolean =
context match {
case Some(pipe) => {
case Some(pipe) =>
try {
if (!suspended) {
false
}
if (!suspended) false
else {
completion(response)
pipe.complete
@ -392,34 +365,28 @@ trait RequestMethod {
EventHandler.error(io, this, io.getMessage)
false
}
}
case None =>
false
case None => false
}
def complete(t: Throwable) {
context match {
case Some(pipe) => {
case Some(pipe) =>
try {
if (suspended) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume")
pipe.complete
}
} catch {
case io: IOException =>
case io: IOException =>
EventHandler.error(io, this, io.getMessage)
}
}
case None => {}
}
}
/**
/*
* Utility methods to send responses back
*/
def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body)
def OK(body: String, headers:Headers): Boolean = complete(HttpServletResponse.SC_OK, body, headers)
def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body)