From 09ceb8ec92a52cbf775857cc04c2cba367a59494 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 9 Nov 2011 15:47:57 +0100 Subject: [PATCH] Removing akka-http, making so that 'waves of actors'-test fails when there's a problem and removing unused config sections in the conf file --- .../scala/akka/http/JettyContinuation.scala | 121 ----- akka-http/src/main/scala/akka/http/Mist.scala | 420 ------------------ .../scala/akka/http/Servlet30Context.scala | 68 --- .../src/test/scala/config/ConfigSpec.scala | 28 -- 4 files changed, 637 deletions(-) delete mode 100644 akka-http/src/main/scala/akka/http/JettyContinuation.scala delete mode 100644 akka-http/src/main/scala/akka/http/Mist.scala delete mode 100644 akka-http/src/main/scala/akka/http/Servlet30Context.scala delete mode 100644 akka-http/src/test/scala/config/ConfigSpec.scala diff --git a/akka-http/src/main/scala/akka/http/JettyContinuation.scala b/akka-http/src/main/scala/akka/http/JettyContinuation.scala deleted file mode 100644 index 6d03d24636..0000000000 --- a/akka-http/src/main/scala/akka/http/JettyContinuation.scala +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Copyright 2010 Autodesk, Inc. All rights reserved. - * Licensed under Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. - * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. - */ - -package akka.http - -import org.eclipse.jetty.server._ -import org.eclipse.jetty.continuation._ -import Types._ -import akka.AkkaApplication - -/** - * @author Garrick Evans - */ -trait JettyContinuation extends ContinuationListener { - import javax.servlet.http.HttpServletResponse - - def app: AkkaApplication - - val builder: () ⇒ tAsyncRequestContext - val context: Option[tAsyncRequestContext] = Some(builder()) - def go = _continuation.isDefined - - protected val _continuation: Option[AsyncContinuation] = { - - val continuation = context.get.asInstanceOf[AsyncContinuation] - - (continuation.isInitial, - continuation.isSuspended, - continuation.isExpired) match { - - // - // the fresh continuation (coming through getAsyncContinuation) - // - case (true, false, false) ⇒ { - continuation.setTimeout(app.MistSettings.DefaultTimeout) - - continuation.addContinuationListener(this) - continuation.suspend - - Some(continuation) - } - // - // the fresh continuation (coming through startAsync instead) - // - case (true, true, false) ⇒ { - - continuation.setTimeout(app.MistSettings.DefaultTimeout) - continuation.addContinuationListener(this) - - Some(continuation) - } - // - // the timeout was reset and the continuation was resumed - // this happens when used with getAsyncContinuation - // - case (false, false, false) ⇒ { - - continuation.setTimeout(continuation.getAttribute(app.MistSettings.TimeoutAttribute).asInstanceOf[Long]) - continuation.suspend - continuation.removeAttribute(app.MistSettings.TimeoutAttribute) - - None - } - // - // the timeout was reset and the continuation is still suspended - // this happens when used with startAsync - // - case (false, true, false) ⇒ { - - continuation.setTimeout(continuation.getAttribute(app.MistSettings.TimeoutAttribute).asInstanceOf[Long]) - continuation.removeAttribute(app.MistSettings.TimeoutAttribute) - - None - } - // - // unexpected continution state(s) - log and do nothing - // - case _ ⇒ { - //continuation.cancel - None - } - } - } - - def suspended: Boolean = _continuation match { - case None ⇒ false - case Some(continuation) ⇒ (continuation.isSuspended || (continuation.getAttribute(app.MistSettings.TimeoutAttribute) ne null)) - } - - def timeout(ms: Long): Boolean = _continuation match { - case None ⇒ false - case Some(continuation) ⇒ - continuation.setAttribute(app.MistSettings.TimeoutAttribute, ms) - continuation.resume - true - } - - // - // ContinuationListener - // - def onComplete(c: Continuation) = {} - def onTimeout(c: Continuation) = { - c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(app.MistSettings.ExpiredHeaderName, app.MistSettings.ExpiredHeaderValue) - c.complete - } -} - -class JettyContinuationMethodFactory(_app: AkkaApplication) extends RequestMethodFactory { - implicit val app = _app - def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod = new Delete(f) with JettyContinuation - def Get(f: () ⇒ tAsyncRequestContext): RequestMethod = new Get(f) with JettyContinuation - def Head(f: () ⇒ tAsyncRequestContext): RequestMethod = new Head(f) with JettyContinuation - def Options(f: () ⇒ tAsyncRequestContext): RequestMethod = new Options(f) with JettyContinuation - def Post(f: () ⇒ tAsyncRequestContext): RequestMethod = new Post(f) with JettyContinuation - def Put(f: () ⇒ tAsyncRequestContext): RequestMethod = new Put(f) with JettyContinuation - def Trace(f: () ⇒ tAsyncRequestContext): RequestMethod = new Trace(f) with JettyContinuation -} - diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala deleted file mode 100644 index 109804f73f..0000000000 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ /dev/null @@ -1,420 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.http - -import akka.event.EventHandler -import akka.config.ConfigurationException -import javax.servlet.http.{ HttpServletResponse, HttpServletRequest } -import javax.servlet.http.HttpServlet -import javax.servlet.Filter -import java.lang.UnsupportedOperationException -import akka.actor.{ ActorRef, Actor } -import Types._ -import akka.AkkaApplication - -/** - * @author Garrick Evans - */ -/** - * Structural type alias's required to work with both Servlet 3.0 and Jetty's Continuation API - * - * @author Garrick Evans - */ -object Types { - import javax.servlet.{ ServletRequest, ServletResponse } - - /** - * Represents an asynchronous request - */ - type tAsyncRequest = { - def startAsync: tAsyncRequestContext - } - - /** - * Used to match both AsyncContext and AsyncContinuation in order to complete the request - */ - type tAsyncRequestContext = { - def complete: Unit - def getRequest: ServletRequest - def getResponse: ServletResponse - } - - type Header = Tuple2[String, String] - type Headers = List[Header] - - def Headers(): Headers = Nil -} - -/** - * - */ -trait Mist { - import javax.servlet.ServletContext - - protected def app: AkkaApplication - - /** - * The root endpoint actor - */ - def root: ActorRef - - /** - * Server-specific method factory - */ - protected var factory: Option[RequestMethodFactory] = None - - /** - * Handles all servlet requests - */ - protected def mistify(request: HttpServletRequest, - response: HttpServletResponse) = { - - val builder: (() ⇒ tAsyncRequestContext) ⇒ RequestMethod = - request.getMethod.toUpperCase match { - case "DELETE" ⇒ factory.get.Delete - case "GET" ⇒ factory.get.Get - case "HEAD" ⇒ factory.get.Head - case "OPTIONS" ⇒ factory.get.Options - case "POST" ⇒ factory.get.Post - case "PUT" ⇒ factory.get.Put - case "TRACE" ⇒ factory.get.Trace - case unknown ⇒ throw new UnsupportedOperationException(unknown) - } - - def suspend(closeConnection: Boolean): tAsyncRequestContext = { - - // set to right now, which is effectively "already expired" - response.setDateHeader("Expires", System.currentTimeMillis) - response.setHeader("Cache-Control", "no-cache, must-revalidate") - - // no keep-alive? - if (closeConnection) response.setHeader("Connection", "close") - - // suspend the request - // TODO: move this out to the specialized support if jetty asyncstart doesnt let us update TOs - request.asInstanceOf[tAsyncRequest].startAsync.asInstanceOf[tAsyncRequestContext] - } - - // shoot the message to the root endpoint for processing - // IMPORTANT: the suspend method is invoked on the server thread not in the actor - val method = builder(() ⇒ suspend(app.MistSettings.ConnectionClose)) - if (method.go) root ! method - } - - /** - * Sets up what mist needs to be able to service requests - * must be called prior to dispatching to "mistify" - */ - def initMist(context: ServletContext) { - val server = context.getServerInfo - val (major, minor) = (context.getMajorVersion, context.getMinorVersion) - factory = if (major >= 3) { - Some(new Servlet30ContextMethodFactory(app)) - } else if (server.toLowerCase startsWith app.MistSettings.JettyServer) { - Some(new JettyContinuationMethodFactory(app)) - } else { - None - } - } -} - -trait RootEndpointLocator { - var root: ActorRef = null - - protected def app: AkkaApplication - - def configureRoot(address: String) { - def findRoot(address: String): ActorRef = - app.provider.actorFor(address).getOrElse( - throw new ConfigurationException("akka.http.root-actor-id configuration option does not have a valid actor address [" + address + "]")) - - root = if ((address eq null) || address == "") findRoot(app.MistSettings.RootActorID) else findRoot(address) - } -} - -/** - * AkkaMistServlet adds support to bridge Http and Actors in an asynchronous fashion - * Async impls currently supported: Servlet3.0, Jetty Continuations - */ -class AkkaMistServlet(val app: AkkaApplication) extends HttpServlet with Mist with RootEndpointLocator { - import javax.servlet.{ ServletConfig } - - /** - * Initializes Mist - */ - override def init(config: ServletConfig) { - super.init(config) - initMist(config.getServletContext) - configureRoot(config.getServletContext.getInitParameter("root-endpoint")) - } - - protected override def service(req: HttpServletRequest, res: HttpServletResponse) = mistify(req, res) -} - -/** - * Proof-of-concept, use at own risk - * Will be officially supported in a later release - */ -class AkkaMistFilter(val app: AkkaApplication) extends Filter with Mist with RootEndpointLocator { - import javax.servlet.{ ServletRequest, ServletResponse, FilterConfig, FilterChain } - - /** - * Initializes Mist - */ - def init(config: FilterConfig) { - initMist(config.getServletContext) - configureRoot(config.getServletContext.getInitParameter("root-endpoint")) - } - - /** - * Decide how/if to handle the request - */ - override def doFilter(req: ServletRequest, res: ServletResponse, chain: FilterChain) { - (req, res) match { - case (hreq: HttpServletRequest, hres: HttpServletResponse) ⇒ - mistify(hreq, hres) - chain.doFilter(req, res) - case _ ⇒ chain.doFilter(req, res) - } - } - - override def destroy {} -} - -/////////////////////////////////////////// -// Endpoints -/////////////////////////////////////////// - -object Endpoint { - import akka.dispatch.Dispatchers - - /** - * leverage the akka config to tweak the dispatcher for our endpoints - */ - //lazy val Dispatcher = Dispatchers.fromConfig("akka.http.mist-dispatcher") - - type Hook = PartialFunction[String, ActorRef] - - case class Attach(hook: Hook) { - //Only here for backwards compat, can possibly be thrown away - def this(hook: String ⇒ Boolean, provider: String ⇒ ActorRef) = this({ - case x if hook(x) ⇒ provider(x) - }) - } - - case class NoneAvailable(uri: String, req: RequestMethod) -} - -/** - * @author Garrick Evans - */ -trait Endpoint { this: Actor ⇒ - - import Endpoint._ - - /** - * A convenience method to get the actor ref - */ - def actor: ActorRef = this.self - - /** - * The list of connected endpoints to which this one should/could forward the request. - * If the hook func returns true, the message will be sent to the actor returned from provider. - */ - protected var _attachments = List[Hook]() - - /** - * - */ - protected def _attach(hook: Hook) = _attachments ::= hook - - /** - * Message handling common to all endpoints, must be chained - */ - protected def handleHttpRequest: Receive = { - - // add the endpoint - the if the uri hook matches, - // the message will be sent to the actor returned by the provider func - case Attach(hook) ⇒ _attach(hook) - - // dispatch the suspended requests - case req: RequestMethod ⇒ { - val uri = req.request.getPathInfo - val endpoints = _attachments.filter { _ isDefinedAt uri } - - if (!endpoints.isEmpty) endpoints.foreach { _.apply(uri) ! req } - else { - if (sender.isShutdown) _na(uri, req) - else sender ! NoneAvailable(uri, req) - } - } - } - - /** - * no endpoint available - completes the request with a 404 - */ - protected def _na(uri: String, req: RequestMethod) = { - req.NotFound("No endpoint available for [" + uri + "]") - } -} - -class RootEndpoint extends Actor with Endpoint { - import Endpoint._ - - final val Root = "/" - - override def preStart() = - _attachments ::= { case `Root` ⇒ this.actor } - - def recv: Receive = { - case NoneAvailable(uri, req) ⇒ _na(uri, req) - case unknown ⇒ - } - - /** - * Note that root is a little different, other endpoints should chain their own recv first - */ - def receive = handleHttpRequest orElse recv -} - -/////////////////////////////////////////// -// RequestMethods -/////////////////////////////////////////// - -/** - * Basic description of the suspended async http request. - * Must be mixed with some kind of specific support (e.g. servlet 3.0 or jetty continuations) - * - * @author Garrick Evans - */ -trait RequestMethod { - import java.io.IOException - import javax.servlet.http.{ HttpServletResponse, HttpServletRequest } - - def app: AkkaApplication - - // required implementations - val builder: () ⇒ tAsyncRequestContext - - /** - * Provides a general type for the underlying context - * - * @return a completable request context - */ - val context: Option[tAsyncRequestContext] - def go: Boolean - - /** - * Updates (resets) the timeout - * - * @return true if updated, false if not supported - */ - def timeout(ms: Long): Boolean - - /** - * Status of the suspension - */ - def suspended: Boolean - - // - // convenience funcs - // - - def request = context.get.getRequest.asInstanceOf[HttpServletRequest] - def response = context.get.getResponse.asInstanceOf[HttpServletResponse] - - def getHeaderOrElse(name: String, default: ⇒ String): String = - request.getHeader(name) match { - case null ⇒ default - case s ⇒ s - } - - def getParameterOrElse(name: String, default: ⇒ String): String = - request.getParameter(name) match { - case null ⇒ default - case s ⇒ s - } - - def complete(status: Int, body: String, headers: Headers = Headers()): Boolean = - rawComplete { res ⇒ - res.setStatus(status) - headers foreach { case (name, value) ⇒ response.setHeader(name, value) } - res.getWriter.write(body) - res.getWriter.close - res.flushBuffer - } - - def rawComplete(completion: HttpServletResponse ⇒ Unit): Boolean = - context match { - case Some(pipe) ⇒ - try { - if (!suspended) false - else { - completion(response) - pipe.complete - true - } - } catch { - case io: Exception ⇒ - app.eventHandler.error(io, this, io.getMessage) - false - } - case None ⇒ false - } - - def complete(t: Throwable) { - context match { - case Some(pipe) ⇒ - try { - if (suspended) { - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to write data to connection on resume") - pipe.complete - } - } catch { - case io: IOException ⇒ - app.eventHandler.error(io, this, io.getMessage) - } - case None ⇒ {} - } - } - - /* - * Utility methods to send responses back - */ - def OK(body: String): Boolean = complete(HttpServletResponse.SC_OK, body) - def OK(body: String, headers: Headers): Boolean = complete(HttpServletResponse.SC_OK, body, headers) - def Created(body: String): Boolean = complete(HttpServletResponse.SC_CREATED, body) - def Accepted(body: String): Boolean = complete(HttpServletResponse.SC_ACCEPTED, body) - def NotModified(body: String): Boolean = complete(HttpServletResponse.SC_NOT_MODIFIED, body) - def BadRequest(body: String): Boolean = complete(HttpServletResponse.SC_BAD_REQUEST, body) - def Unauthorized(body: String): Boolean = complete(HttpServletResponse.SC_UNAUTHORIZED, body) - def Forbidden(body: String): Boolean = complete(HttpServletResponse.SC_FORBIDDEN, body) - def NotAllowed(body: String): Boolean = complete(HttpServletResponse.SC_METHOD_NOT_ALLOWED, body) - def NotFound(body: String): Boolean = complete(HttpServletResponse.SC_NOT_FOUND, body) - def Timeout(body: String): Boolean = complete(HttpServletResponse.SC_REQUEST_TIMEOUT, body) - def Conflict(body: String): Boolean = complete(HttpServletResponse.SC_CONFLICT, body) - def UnsupportedMediaType(body: String): Boolean = complete(HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE, body) - def Error(body: String): Boolean = complete(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, body) - def NotImplemented(body: String): Boolean = complete(HttpServletResponse.SC_NOT_IMPLEMENTED, body) - def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString))) -} - -abstract class Delete(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod -abstract class Get(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod -abstract class Head(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod -abstract class Options(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod -abstract class Post(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod -abstract class Put(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod -abstract class Trace(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod - -trait RequestMethodFactory { - def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod - def Get(f: () ⇒ tAsyncRequestContext): RequestMethod - def Head(f: () ⇒ tAsyncRequestContext): RequestMethod - def Options(f: () ⇒ tAsyncRequestContext): RequestMethod - def Post(f: () ⇒ tAsyncRequestContext): RequestMethod - def Put(f: () ⇒ tAsyncRequestContext): RequestMethod - def Trace(f: () ⇒ tAsyncRequestContext): RequestMethod -} diff --git a/akka-http/src/main/scala/akka/http/Servlet30Context.scala b/akka-http/src/main/scala/akka/http/Servlet30Context.scala deleted file mode 100644 index fd797b8a5c..0000000000 --- a/akka-http/src/main/scala/akka/http/Servlet30Context.scala +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.http - -import javax.servlet.{ AsyncContext, AsyncListener, AsyncEvent } -import Types._ -import akka.AkkaApplication - -/** - * @author Garrick Evans - */ -trait Servlet30Context extends AsyncListener { - import javax.servlet.http.HttpServletResponse - - def app: AkkaApplication - - val builder: () ⇒ tAsyncRequestContext - val context: Option[tAsyncRequestContext] = Some(builder()) - def go = context.isDefined - - protected val _ac: AsyncContext = { - val ac = context.get.asInstanceOf[AsyncContext] - ac setTimeout app.MistSettings.DefaultTimeout - ac addListener this - ac - } - - def suspended = true - - def timeout(ms: Long): Boolean = { - try { - _ac setTimeout ms - true - } catch { - case e: IllegalStateException ⇒ - app.eventHandler.error(e, this, e.getMessage) - false - } - } - - // - // AsyncListener - // - def onComplete(e: AsyncEvent) {} - def onError(e: AsyncEvent) = e.getThrowable match { - case null ⇒ - case t ⇒ app.eventHandler.error(t, this, t.getMessage) - } - def onStartAsync(e: AsyncEvent) {} - def onTimeout(e: AsyncEvent) = { - e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(app.MistSettings.ExpiredHeaderName, app.MistSettings.ExpiredHeaderValue) - e.getAsyncContext.complete - } -} - -class Servlet30ContextMethodFactory(_app: AkkaApplication) extends RequestMethodFactory { - implicit val app = _app - def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod = new Delete(f) with Servlet30Context - def Get(f: () ⇒ tAsyncRequestContext): RequestMethod = new Get(f) with Servlet30Context - def Head(f: () ⇒ tAsyncRequestContext): RequestMethod = new Head(f) with Servlet30Context - def Options(f: () ⇒ tAsyncRequestContext): RequestMethod = new Options(f) with Servlet30Context - def Post(f: () ⇒ tAsyncRequestContext): RequestMethod = new Post(f) with Servlet30Context - def Put(f: () ⇒ tAsyncRequestContext): RequestMethod = new Put(f) with Servlet30Context - def Trace(f: () ⇒ tAsyncRequestContext): RequestMethod = new Trace(f) with Servlet30Context -} - diff --git a/akka-http/src/test/scala/config/ConfigSpec.scala b/akka-http/src/test/scala/config/ConfigSpec.scala deleted file mode 100644 index d73099840b..0000000000 --- a/akka-http/src/test/scala/config/ConfigSpec.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.config - -import akka.testkit.AkkaSpec - -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ConfigSpec extends AkkaSpec { - - "The default configuration file (i.e. akka-reference.conf)" should { - "contain all configuration properties for akka-http that are used in code with their correct defaults" in { - import app.config._ - getBool("akka.http.connection-close") must equal(Some(true)) - getString("akka.http.expired-header-name") must equal(Some("Async-Timeout")) - getString("akka.http.hostname") must equal(Some("localhost")) - getString("akka.http.expired-header-value") must equal(Some("expired")) - getInt("akka.http.port") must equal(Some(9998)) - getBool("akka.http.root-actor-builtin") must equal(Some(true)) - getString("akka.http.root-actor-id") must equal(Some("_httproot")) - getLong("akka.http.timeout") must equal(Some(1000)) - } - } -}