diff --git a/akka-http-core/src/main/scala/akka/http/engine/TokenSourceActor.scala b/akka-http-core/src/main/scala/akka/http/engine/TokenSourceActor.scala new file mode 100644 index 0000000000..8ffffec751 --- /dev/null +++ b/akka-http-core/src/main/scala/akka/http/engine/TokenSourceActor.scala @@ -0,0 +1,35 @@ +package akka.http.engine + +import scala.annotation.tailrec +import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher } + +/** + * An actor publisher for producing a simple stream of singleton tokens + * the release of which is triggered by the reception of a [[TokenSourceActor.Trigger]] message. + */ +private[engine] class TokenSourceActor[T](token: T) extends ActorPublisher[T] { + private var triggered = 0 + + def receive = { + case TokenSourceActor.Trigger ⇒ + triggered += 1 + tryDispatch() + + case ActorPublisherMessage.Request(_) ⇒ + tryDispatch() + + case ActorPublisherMessage.Cancel ⇒ + context.stop(self) + } + + @tailrec private def tryDispatch(): Unit = + if (triggered > 0 && totalDemand > 0) { + onNext(token) + triggered -= 1 + tryDispatch() + } +} + +private[engine] object TokenSourceActor { + case object Trigger +} diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index dfcd54cfdf..ddb57ee22a 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -4,10 +4,10 @@ package akka.http.engine.server +import scala.util.control.NonFatal import akka.actor.{ ActorRef, Props } import akka.util.ByteString import akka.event.LoggingAdapter -import akka.stream.stage.PushPullStage import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlattenStrategy import akka.stream.scaladsl._ @@ -15,11 +15,9 @@ import akka.stream.stage.PushPullStage import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser } import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.engine.parsing.ParserOutput._ +import akka.http.engine.TokenSourceActor import akka.http.model._ import akka.http.util._ -import akka.http.Http - -import scala.util.control.NonFatal /** * INTERNAL API @@ -44,7 +42,7 @@ private[http] object HttpServer { @volatile var oneHundredContinueRef: Option[ActorRef] = None // FIXME: unnecessary after fixing #16168 val oneHundredContinueSource = Source[OneHundredContinue.type] { Props { - val actor = new OneHundredContinueSourceActor + val actor = new TokenSourceActor(OneHundredContinue) oneHundredContinueRef = Some(actor.context.self) actor } @@ -56,7 +54,7 @@ private[http] object HttpServer { val requestParsing = Flow[ByteString].section(name("rootParser"))(_.transform(() ⇒ // each connection uses a single (private) request parser instance for all its requests // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(() ⇒ oneHundredContinueRef))) + rootParser.createShallowCopy(() ⇒ oneHundredContinueRef).stage)) val requestPreparation = Flow[RequestOutput] @@ -129,9 +127,9 @@ private[http] object HttpServer { override def initialCompletionHandling = CompletionHandling( onComplete = (ctx, _) ⇒ { ctx.complete(); SameState }, onError = { - case (ctx, _, error: Http.StreamException) ⇒ + case (ctx, _, EntityStreamException(errorInfo)) ⇒ // the application has forwarded a request entity stream error to the response stream - finishWithError(ctx, "request", StatusCodes.BadRequest, error.info) + finishWithError(ctx, "request", StatusCodes.BadRequest, errorInfo) case (ctx, _, error) ⇒ ctx.error(error) SameState @@ -150,27 +148,63 @@ private[http] object HttpServer { } } } -} -private[server] class ErrorsTo500ResponseRecovery(log: LoggingAdapter) - extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] { - import akka.stream.stage.Context + /** + * The `Expect: 100-continue` header has a special status in HTTP. + * It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending + * (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to + * accept the request and responds with + * + * - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation + * (or if the `Expect` header contains other, unsupported expectations). + * - a `100 Continue` response, + * if it is ready to accept the request entity and the client should go ahead with sending it + * - a final response (like a 4xx to signal some client-side error + * (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect) + * + * Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request + * entity. In this case it will receive another response after having completed request sending. + * So this special feature breaks the normal "one request - one response" logic of HTTP! + * It therefore requires special handling in all HTTP stacks (client- and server-side). + * + * For us this means: + * + * - on the server-side: + * After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send + * it through to the application. Only when (and if) the application then requests data from the entity stream do we + * send out a `100 Continue` response and continue reading the request entity. + * The application can therefore determine itself whether it wants the client to send the request entity + * by deciding whether to look at the request entity data stream or not. + * If the application sends a response *without* having looked at the request entity the client receives this + * response *instead of* the `100 Continue` response and the server closes the connection afterwards. + * + * - on the client-side: + * If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until + * we've received a `100 Continue` response. + */ + case object OneHundredContinue - private[this] var errorResponse: ResponseRenderingContext = _ + final class ErrorsTo500ResponseRecovery(log: LoggingAdapter) + extends PushPullStage[ResponseRenderingContext, ResponseRenderingContext] { - override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem) + import akka.stream.stage.Context - override def onPull(ctx: Context[ResponseRenderingContext]) = - if (ctx.isFinishing) ctx.pushAndFinish(errorResponse) - else ctx.pull() + private[this] var errorResponse: ResponseRenderingContext = _ - override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) = - error match { - case NonFatal(e) ⇒ - log.error(e, "Internal server error, sending 500 response") - errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError), - closeAfterResponseCompletion = true) - ctx.absorbTermination() - case _ ⇒ ctx.fail(error) - } -} + override def onPush(elem: ResponseRenderingContext, ctx: Context[ResponseRenderingContext]) = ctx.push(elem) + + override def onPull(ctx: Context[ResponseRenderingContext]) = + if (ctx.isFinishing) ctx.pushAndFinish(errorResponse) + else ctx.pull() + + override def onUpstreamFailure(error: Throwable, ctx: Context[ResponseRenderingContext]) = + error match { + case NonFatal(e) ⇒ + log.error(e, "Internal server error, sending 500 response") + errorResponse = ResponseRenderingContext(HttpResponse(StatusCodes.InternalServerError), + closeAfterResponseCompletion = true) + ctx.absorbTermination() + case _ ⇒ ctx.fail(error) + } + } +} \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/OneHundredContinue.scala b/akka-http-core/src/main/scala/akka/http/engine/server/OneHundredContinue.scala deleted file mode 100644 index 37c58cad4f..0000000000 --- a/akka-http-core/src/main/scala/akka/http/engine/server/OneHundredContinue.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ - -package akka.http.engine.server - -import scala.annotation.tailrec -import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher } - -/** - * The `Expect: 100-continue` header has a special status in HTTP. - * It allows the client to send an `Expect: 100-continue` header with the request and then pause request sending - * (i.e. hold back sending the request entity). The server reads the request headers, determines whether it wants to - * accept the request and responds with - * - * - `417 Expectation Failed`, if it doesn't support the `100-continue` expectation - * (or if the `Expect` header contains other, unsupported expectations). - * - a `100 Continue` response, - * if it is ready to accept the request entity and the client should go ahead with sending it - * - a final response (like a 4xx to signal some client-side error - * (e.g. if the request entity length is beyond the configured limit) or a 3xx redirect) - * - * Only if the client receives a `100 Continue` response from the server is it allowed to continue sending the request - * entity. In this case it will receive another response after having completed request sending. - * So this special feature breaks the normal "one request - one response" logic of HTTP! - * It therefore requires special handling in all HTTP stacks (client- and server-side). - * - * For us this means: - * - * - on the server-side: - * After having read a `Expect: 100-continue` header with the request we package up an `HttpRequest` instance and send - * it through to the application. Only when (and if) the application then requests data from the entity stream do we - * send out a `100 Continue` response and continue reading the request entity. - * The application can therefore determine itself whether it wants the client to send the request entity - * by deciding whether to look at the request entity data stream or not. - * If the application sends a response *without* having looked at the request entity the client receives this - * response *instead of* the `100 Continue` response and the server closes the connection afterwards. - * - * - on the client-side: - * If the user adds a `Expect: 100-continue` header to the request we need to hold back sending the entity until - * we've received a `100 Continue` response. - */ -private[engine] case object OneHundredContinue - -private[engine] class OneHundredContinueSourceActor extends ActorPublisher[OneHundredContinue.type] { - private var triggered = 0 - - def receive = { - case OneHundredContinue ⇒ - triggered += 1 - tryDispatch() - - case ActorPublisherMessage.Request(_) ⇒ - tryDispatch() - - case ActorPublisherMessage.Cancel ⇒ - context.stop(self) - } - - @tailrec private def tryDispatch(): Unit = - if (triggered > 0 && totalDemand > 0) { - onNext(OneHundredContinue) - triggered -= 1 - tryDispatch() - } -} \ No newline at end of file