=htc small refactoring in OneHundredContinue support

This commit is contained in:
Mathias 2014-12-18 17:04:45 +01:00
parent 44a684c390
commit 4d907bfb50
3 changed files with 96 additions and 93 deletions

View file

@ -0,0 +1,35 @@
package akka.http.engine
import scala.annotation.tailrec
import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher }
/**
* An actor publisher for producing a simple stream of singleton tokens
* the release of which is triggered by the reception of a [[TokenSourceActor.Trigger]] message.
*/
private[engine] class TokenSourceActor[T](token: T) extends ActorPublisher[T] {
private var triggered = 0
def receive = {
case TokenSourceActor.Trigger
triggered += 1
tryDispatch()
case ActorPublisherMessage.Request(_)
tryDispatch()
case ActorPublisherMessage.Cancel
context.stop(self)
}
@tailrec private def tryDispatch(): Unit =
if (triggered > 0 && totalDemand > 0) {
onNext(token)
triggered -= 1
tryDispatch()
}
}
private[engine] object TokenSourceActor {
case object Trigger
}

View file

@ -4,10 +4,10 @@
package akka.http.engine.server
import scala.util.control.NonFatal
import akka.actor.{ ActorRef, Props }
import akka.util.ByteString
import akka.event.LoggingAdapter
import akka.stream.stage.PushPullStage
import akka.stream.scaladsl.OperationAttributes._
import akka.stream.FlattenStrategy
import akka.stream.scaladsl._
@ -15,11 +15,9 @@ import akka.stream.stage.PushPullStage
import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser }
import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory }
import akka.http.engine.parsing.ParserOutput._
import akka.http.engine.TokenSourceActor
import akka.http.model._
import akka.http.util._
import akka.http.Http
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -44,7 +42,7 @@ private[http] object HttpServer {
@volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168
val oneHundredContinueSource = Source[OneHundredContinue.type] {
Props {
val actor = new OneHundredContinueSourceActor
val actor = new TokenSourceActor(OneHundredContinue)
oneHundredContinueRef = Some(actor.context.self)
actor
}
@ -56,7 +54,7 @@ private[http] object HttpServer {
val requestParsing = Flow[ByteString].section(name("rootParser"))(_.transform(()
// each connection uses a single (private) request parser instance for all its requests
// which builds a cache of all header instances seen on that connection
rootParser.createShallowCopy(() oneHundredContinueRef)))
rootParser.createShallowCopy(() oneHundredContinueRef).stage))
val requestPreparation =
Flow[RequestOutput]
@ -129,9 +127,9 @@ private[http] object HttpServer {
override def initialCompletionHandling = CompletionHandling(
onComplete = (ctx, _) { ctx.complete(); SameState },
onError = {
case (ctx, _, error: Http.StreamException)
case (ctx, _, EntityStreamException(errorInfo))
// the application has forwarded a request entity stream error to the response stream
finishWithError(ctx, "request", StatusCodes.BadRequest, error.info)
finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo)
case (ctx, _, error)
ctx.error(error)
SameState
@ -150,27 +148,63 @@ private[http] object HttpServer {
}
}
}
}
private[server] class ErrorsTo500ResponseRecovery(log: LoggingAdapter)
extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] {
import akka.stream.stage.Context
/**
* The `Expect: 100-continue` header has a special status in HTTP.
* It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending
* (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to
* accept the request and responds with
*
* - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation
* (or if the `Expect` header contains other, unsupported expectations).
* - a `100 Continue` response,
* if it is ready to accept the request entity and the client should go ahead with sending it
* - a final response (like a 4xx to signal some client-side error
* (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect)
*
* Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request
* entity. In this case it will receive another response after having completed request sending.
* So this special feature breaks the normal "one request - one response" logic of HTTP!
* It therefore requires special handling in all HTTP stacks (client- and server-side).
*
* For us this means:
*
* - on the server-side:
* After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send
* it through to the application. Only when (and if) the application then requests data from the entity stream do we
* send out a `100 Continue` response and continue reading the request entity.
* The application can therefore determine itself whether it wants the client to send the request entity
* by deciding whether to look at the request entity data stream or not.
* If the application sends a response *without* having looked at the request entity the client receives this
* response *instead of* the `100 Continue` response and the server closes the connection afterwards.
*
* - on the client-side:
* If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until
* we've received a `100 Continue` response.
*/
case object OneHundredContinue
private[this] var errorResponse: ResponseRenderingContext = _
final class ErrorsTo500ResponseRecovery(log: LoggingAdapter)
extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] {
override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem)
import akka.stream.stage.Context
override def onPull(ctx: Context[ResponseRenderingContext]) =
if (ctx.isFinishing) ctx.pushAndFinish(errorResponse)
else ctx.pull()
private[this] var errorResponse: ResponseRenderingContext = _
override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) =
error match {
case NonFatal(e)
log.error(e, "Internal server error, sending 500 response")
errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError),
closeAfterResponseCompletion = true)
ctx.absorbTermination()
case _ ctx.fail(error)
}
}
override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem)
override def onPull(ctx: Context[ResponseRenderingContext]) =
if (ctx.isFinishing) ctx.pushAndFinish(errorResponse)
else ctx.pull()
override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) =
error match {
case NonFatal(e)
log.error(e, "Internal server error, sending 500 response")
errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError),
closeAfterResponseCompletion = true)
ctx.absorbTermination()
case _ ctx.fail(error)
}
}
}

View file

@ -1,66 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.engine.server
import scala.annotation.tailrec
import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher }
/**
* The `Expect: 100-continue` header has a special status in HTTP.
* It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending
* (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to
* accept the request and responds with
*
* - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation
* (or if the `Expect` header contains other, unsupported expectations).
* - a `100 Continue` response,
* if it is ready to accept the request entity and the client should go ahead with sending it
* - a final response (like a 4xx to signal some client-side error
* (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect)
*
* Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request
* entity. In this case it will receive another response after having completed request sending.
* So this special feature breaks the normal "one request - one response" logic of HTTP!
* It therefore requires special handling in all HTTP stacks (client- and server-side).
*
* For us this means:
*
* - on the server-side:
* After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send
* it through to the application. Only when (and if) the application then requests data from the entity stream do we
* send out a `100 Continue` response and continue reading the request entity.
* The application can therefore determine itself whether it wants the client to send the request entity
* by deciding whether to look at the request entity data stream or not.
* If the application sends a response *without* having looked at the request entity the client receives this
* response *instead of* the `100 Continue` response and the server closes the connection afterwards.
*
* - on the client-side:
* If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until
* we've received a `100 Continue` response.
*/
private[engine] case object OneHundredContinue
private[engine] class OneHundredContinueSourceActor extends ActorPublisher[OneHundredContinue.type] {
private var triggered = 0
def receive = {
case OneHundredContinue
triggered += 1
tryDispatch()
case ActorPublisherMessage.Request(_)
tryDispatch()
case ActorPublisherMessage.Cancel
context.stop(self)
}
@tailrec private def tryDispatch(): Unit =
if (triggered > 0 && totalDemand > 0) {
onNext(OneHundredContinue)
triggered -= 1
tryDispatch()
}
}