Merge branch 'master' of https://github.com/jboner/akka
This commit is contained in:
commit
e2c46cc2b8
4 changed files with 54 additions and 62 deletions
|
|
@ -9,13 +9,15 @@ package akka.http
|
|||
import org.eclipse.jetty.server._
|
||||
import org.eclipse.jetty.continuation._
|
||||
import Types._
|
||||
import akka.AkkaApplication
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait JettyContinuation extends ContinuationListener {
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
import MistSettings._
|
||||
|
||||
def app: AkkaApplication
|
||||
|
||||
val builder: () ⇒ tAsyncRequestContext
|
||||
val context: Option[tAsyncRequestContext] = Some(builder())
|
||||
|
|
@ -33,7 +35,7 @@ trait JettyContinuation extends ContinuationListener {
|
|||
// the fresh continuation (coming through getAsyncContinuation)
|
||||
//
|
||||
case (true, false, false) ⇒ {
|
||||
continuation.setTimeout(DefaultTimeout)
|
||||
continuation.setTimeout(app.MistSettings.DefaultTimeout)
|
||||
|
||||
continuation.addContinuationListener(this)
|
||||
continuation.suspend
|
||||
|
|
@ -45,7 +47,7 @@ trait JettyContinuation extends ContinuationListener {
|
|||
//
|
||||
case (true, true, false) ⇒ {
|
||||
|
||||
continuation.setTimeout(DefaultTimeout)
|
||||
continuation.setTimeout(app.MistSettings.DefaultTimeout)
|
||||
continuation.addContinuationListener(this)
|
||||
|
||||
Some(continuation)
|
||||
|
|
@ -56,9 +58,9 @@ trait JettyContinuation extends ContinuationListener {
|
|||
//
|
||||
case (false, false, false) ⇒ {
|
||||
|
||||
continuation.setTimeout(continuation.getAttribute(TimeoutAttribute).asInstanceOf[Long])
|
||||
continuation.setTimeout(continuation.getAttribute(app.MistSettings.TimeoutAttribute).asInstanceOf[Long])
|
||||
continuation.suspend
|
||||
continuation.removeAttribute(TimeoutAttribute)
|
||||
continuation.removeAttribute(app.MistSettings.TimeoutAttribute)
|
||||
|
||||
None
|
||||
}
|
||||
|
|
@ -68,8 +70,8 @@ trait JettyContinuation extends ContinuationListener {
|
|||
//
|
||||
case (false, true, false) ⇒ {
|
||||
|
||||
continuation.setTimeout(continuation.getAttribute(TimeoutAttribute).asInstanceOf[Long])
|
||||
continuation.removeAttribute(TimeoutAttribute)
|
||||
continuation.setTimeout(continuation.getAttribute(app.MistSettings.TimeoutAttribute).asInstanceOf[Long])
|
||||
continuation.removeAttribute(app.MistSettings.TimeoutAttribute)
|
||||
|
||||
None
|
||||
}
|
||||
|
|
@ -85,13 +87,13 @@ trait JettyContinuation extends ContinuationListener {
|
|||
|
||||
def suspended: Boolean = _continuation match {
|
||||
case None ⇒ false
|
||||
case Some(continuation) ⇒ (continuation.isSuspended || (continuation.getAttribute(TimeoutAttribute) ne null))
|
||||
case Some(continuation) ⇒ (continuation.isSuspended || (continuation.getAttribute(app.MistSettings.TimeoutAttribute) ne null))
|
||||
}
|
||||
|
||||
def timeout(ms: Long): Boolean = _continuation match {
|
||||
case None ⇒ false
|
||||
case Some(continuation) ⇒
|
||||
continuation.setAttribute(TimeoutAttribute, ms)
|
||||
continuation.setAttribute(app.MistSettings.TimeoutAttribute, ms)
|
||||
continuation.resume
|
||||
true
|
||||
}
|
||||
|
|
@ -101,12 +103,13 @@ trait JettyContinuation extends ContinuationListener {
|
|||
//
|
||||
def onComplete(c: Continuation) = {}
|
||||
def onTimeout(c: Continuation) = {
|
||||
c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue)
|
||||
c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(app.MistSettings.ExpiredHeaderName, app.MistSettings.ExpiredHeaderValue)
|
||||
c.complete
|
||||
}
|
||||
}
|
||||
|
||||
object JettyContinuationMethodFactory extends RequestMethodFactory {
|
||||
class JettyContinuationMethodFactory(_app: AkkaApplication) extends RequestMethodFactory {
|
||||
implicit val app = _app
|
||||
def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod = new Delete(f) with JettyContinuation
|
||||
def Get(f: () ⇒ tAsyncRequestContext): RequestMethod = new Get(f) with JettyContinuation
|
||||
def Head(f: () ⇒ tAsyncRequestContext): RequestMethod = new Head(f) with JettyContinuation
|
||||
|
|
|
|||
|
|
@ -6,30 +6,17 @@ package akka.http
|
|||
|
||||
import akka.event.EventHandler
|
||||
import akka.config.ConfigurationException
|
||||
|
||||
import javax.servlet.http.{ HttpServletResponse, HttpServletRequest }
|
||||
import javax.servlet.http.HttpServlet
|
||||
import javax.servlet.Filter
|
||||
import java.lang.UnsupportedOperationException
|
||||
import akka.actor.{ NullChannel, ActorRef, Actor }
|
||||
import Types._
|
||||
import akka.AkkaApplication
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
object MistSettings {
|
||||
import akka.config.Config._
|
||||
|
||||
val JettyServer = "jetty"
|
||||
val TimeoutAttribute = "timeout"
|
||||
|
||||
val ConnectionClose = config.getBool("akka.http.connection-close", true)
|
||||
val RootActorBuiltin = config.getBool("akka.http.root-actor-builtin", true)
|
||||
val RootActorID = config.getString("akka.http.root-actor-id", "_httproot")
|
||||
val DefaultTimeout = config.getLong("akka.http.timeout", 1000)
|
||||
val ExpiredHeaderName = config.getString("akka.http.expired-header-name", "Async-Timeout")
|
||||
val ExpiredHeaderValue = config.getString("akka.http.expired-header-value", "expired")
|
||||
}
|
||||
|
||||
/**
|
||||
* Structural type alias's required to work with both Servlet 3.0 and Jetty's Continuation API
|
||||
*
|
||||
|
|
@ -60,14 +47,13 @@ object Types {
|
|||
def Headers(): Headers = Nil
|
||||
}
|
||||
|
||||
import Types._
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
trait Mist {
|
||||
import javax.servlet.ServletContext
|
||||
import MistSettings._
|
||||
|
||||
protected def app: AkkaApplication
|
||||
|
||||
/**
|
||||
* The root endpoint actor
|
||||
|
|
@ -113,7 +99,7 @@ trait Mist {
|
|||
|
||||
// shoot the message to the root endpoint for processing
|
||||
// IMPORTANT: the suspend method is invoked on the server thread not in the actor
|
||||
val method = builder(() ⇒ suspend(ConnectionClose))
|
||||
val method = builder(() ⇒ suspend(app.MistSettings.ConnectionClose))
|
||||
if (method.go) root ! method
|
||||
}
|
||||
|
||||
|
|
@ -125,9 +111,9 @@ trait Mist {
|
|||
val server = context.getServerInfo
|
||||
val (major, minor) = (context.getMajorVersion, context.getMinorVersion)
|
||||
factory = if (major >= 3) {
|
||||
Some(Servlet30ContextMethodFactory)
|
||||
} else if (server.toLowerCase startsWith JettyServer) {
|
||||
Some(JettyContinuationMethodFactory)
|
||||
Some(new Servlet30ContextMethodFactory(app))
|
||||
} else if (server.toLowerCase startsWith app.MistSettings.JettyServer) {
|
||||
Some(new JettyContinuationMethodFactory(app))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
|
@ -137,12 +123,14 @@ trait Mist {
|
|||
trait RootEndpointLocator {
|
||||
var root: ActorRef = null
|
||||
|
||||
protected def app: AkkaApplication
|
||||
|
||||
def configureRoot(address: String) {
|
||||
def findRoot(address: String): ActorRef =
|
||||
Actor.provider.actorFor(address).getOrElse(
|
||||
app.provider.actorFor(address).getOrElse(
|
||||
throw new ConfigurationException("akka.http.root-actor-id configuration option does not have a valid actor address [" + address + "]"))
|
||||
|
||||
root = if ((address eq null) || address == "") findRoot(MistSettings.RootActorID) else findRoot(address)
|
||||
root = if ((address eq null) || address == "") findRoot(app.MistSettings.RootActorID) else findRoot(address)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -150,7 +138,7 @@ trait RootEndpointLocator {
|
|||
* AkkaMistServlet adds support to bridge Http and Actors in an asynchronous fashion
|
||||
* Async impls currently supported: Servlet3.0, Jetty Continuations
|
||||
*/
|
||||
class AkkaMistServlet extends HttpServlet with Mist with RootEndpointLocator {
|
||||
class AkkaMistServlet(val app: AkkaApplication) extends HttpServlet with Mist with RootEndpointLocator {
|
||||
import javax.servlet.{ ServletConfig }
|
||||
|
||||
/**
|
||||
|
|
@ -169,7 +157,7 @@ class AkkaMistServlet extends HttpServlet with Mist with RootEndpointLocator {
|
|||
* Proof-of-concept, use at own risk
|
||||
* Will be officially supported in a later release
|
||||
*/
|
||||
class AkkaMistFilter extends Filter with Mist with RootEndpointLocator {
|
||||
class AkkaMistFilter(val app: AkkaApplication) extends Filter with Mist with RootEndpointLocator {
|
||||
import javax.servlet.{ ServletRequest, ServletResponse, FilterConfig, FilterChain }
|
||||
|
||||
/**
|
||||
|
|
@ -205,7 +193,7 @@ object Endpoint {
|
|||
/**
|
||||
* leverage the akka config to tweak the dispatcher for our endpoints
|
||||
*/
|
||||
lazy val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
|
||||
//lazy val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher")
|
||||
|
||||
type Hook = PartialFunction[String, ActorRef]
|
||||
|
||||
|
|
@ -276,7 +264,6 @@ trait Endpoint { this: Actor ⇒
|
|||
|
||||
class RootEndpoint extends Actor with Endpoint {
|
||||
import Endpoint._
|
||||
import MistSettings._
|
||||
|
||||
final val Root = "/"
|
||||
|
||||
|
|
@ -308,6 +295,8 @@ trait RequestMethod {
|
|||
import java.io.IOException
|
||||
import javax.servlet.http.{ HttpServletResponse, HttpServletRequest }
|
||||
|
||||
def app: AkkaApplication
|
||||
|
||||
// required implementations
|
||||
val builder: () ⇒ tAsyncRequestContext
|
||||
|
||||
|
|
@ -371,7 +360,7 @@ trait RequestMethod {
|
|||
}
|
||||
} catch {
|
||||
case io: Exception ⇒
|
||||
EventHandler.error(io, this, io.getMessage)
|
||||
app.eventHandler.error(io, this, io.getMessage)
|
||||
false
|
||||
}
|
||||
case None ⇒ false
|
||||
|
|
@ -387,7 +376,7 @@ trait RequestMethod {
|
|||
}
|
||||
} catch {
|
||||
case io: IOException ⇒
|
||||
EventHandler.error(io, this, io.getMessage)
|
||||
app.eventHandler.error(io, this, io.getMessage)
|
||||
}
|
||||
case None ⇒ {}
|
||||
}
|
||||
|
|
@ -414,13 +403,13 @@ trait RequestMethod {
|
|||
def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString)))
|
||||
}
|
||||
|
||||
abstract class Delete(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Get(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Head(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Options(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Post(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Put(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Trace(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod
|
||||
abstract class Delete(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod
|
||||
abstract class Get(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod
|
||||
abstract class Head(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod
|
||||
abstract class Options(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod
|
||||
abstract class Post(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod
|
||||
abstract class Put(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod
|
||||
abstract class Trace(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod
|
||||
|
||||
trait RequestMethodFactory {
|
||||
def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod
|
||||
|
|
|
|||
|
|
@ -4,17 +4,17 @@
|
|||
|
||||
package akka.http
|
||||
|
||||
import javax.servlet.{ AsyncContext, AsyncListener, AsyncEvent };
|
||||
import javax.servlet.{ AsyncContext, AsyncListener, AsyncEvent }
|
||||
import Types._
|
||||
|
||||
import akka.event.EventHandler
|
||||
import akka.AkkaApplication
|
||||
|
||||
/**
|
||||
* @author Garrick Evans
|
||||
*/
|
||||
trait Servlet30Context extends AsyncListener {
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
import MistSettings._
|
||||
|
||||
def app: AkkaApplication
|
||||
|
||||
val builder: () ⇒ tAsyncRequestContext
|
||||
val context: Option[tAsyncRequestContext] = Some(builder())
|
||||
|
|
@ -22,7 +22,7 @@ trait Servlet30Context extends AsyncListener {
|
|||
|
||||
protected val _ac: AsyncContext = {
|
||||
val ac = context.get.asInstanceOf[AsyncContext]
|
||||
ac setTimeout DefaultTimeout
|
||||
ac setTimeout app.MistSettings.DefaultTimeout
|
||||
ac addListener this
|
||||
ac
|
||||
}
|
||||
|
|
@ -35,7 +35,7 @@ trait Servlet30Context extends AsyncListener {
|
|||
true
|
||||
} catch {
|
||||
case e: IllegalStateException ⇒
|
||||
EventHandler.error(e, this, e.getMessage)
|
||||
app.eventHandler.error(e, this, e.getMessage)
|
||||
false
|
||||
}
|
||||
}
|
||||
|
|
@ -46,16 +46,17 @@ trait Servlet30Context extends AsyncListener {
|
|||
def onComplete(e: AsyncEvent) {}
|
||||
def onError(e: AsyncEvent) = e.getThrowable match {
|
||||
case null ⇒
|
||||
case t ⇒ EventHandler.error(t, this, t.getMessage)
|
||||
case t ⇒ app.eventHandler.error(t, this, t.getMessage)
|
||||
}
|
||||
def onStartAsync(e: AsyncEvent) {}
|
||||
def onTimeout(e: AsyncEvent) = {
|
||||
e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(ExpiredHeaderName, ExpiredHeaderValue)
|
||||
e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(app.MistSettings.ExpiredHeaderName, app.MistSettings.ExpiredHeaderValue)
|
||||
e.getAsyncContext.complete
|
||||
}
|
||||
}
|
||||
|
||||
object Servlet30ContextMethodFactory extends RequestMethodFactory {
|
||||
class Servlet30ContextMethodFactory(_app: AkkaApplication) extends RequestMethodFactory {
|
||||
implicit val app = _app
|
||||
def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod = new Delete(f) with Servlet30Context
|
||||
def Get(f: () ⇒ tAsyncRequestContext): RequestMethod = new Get(f) with Servlet30Context
|
||||
def Head(f: () ⇒ tAsyncRequestContext): RequestMethod = new Head(f) with Servlet30Context
|
||||
|
|
|
|||
|
|
@ -4,18 +4,17 @@
|
|||
|
||||
package akka.config
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ConfigSpec extends WordSpec with MustMatchers {
|
||||
class ConfigSpec extends AkkaSpec {
|
||||
|
||||
"The default configuration file (i.e. akka-reference.conf)" should {
|
||||
"contain all configuration properties for akka-http that are used in code with their correct defaults" in {
|
||||
import Config.config._
|
||||
|
||||
import app.config._
|
||||
getBool("akka.http.connection-close") must equal(Some(true))
|
||||
getString("akka.http.expired-header-name") must equal(Some("Async-Timeout"))
|
||||
getString("akka.http.hostname") must equal(Some("localhost"))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue