diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 78a423dfb9..35da511010 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -69,7 +69,7 @@ private[http] object OutgoingConnectionBlueprint { import ParserOutput._ val responsePrep = Flow[List[ResponseOutput]] .mapConcat(conforms) - .via(new ResponsePrep(parserSettings)) + .via(new PrepareResponse(parserSettings)) val core = BidiFlow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ @@ -131,11 +131,19 @@ private[http] object OutgoingConnectionBlueprint { import ParserOutput._ - private final class ResponsePrep(parserSettings: ParserSettings) + /** + * This is essentially a three state state machine, it is either 'idle' - waiting for a response to come in + * or has seen the start of a response and is waiting for either chunks followed by MessageEnd if chunked + * or just MessageEnd in the case of a strict response. + * + * For chunked responses a new substream into the response entity is opened and data is streamed there instead + * of downstream until end of chunks has been reached. + */ + private[client] final class PrepareResponse(parserSettings: ParserSettings) extends GraphStage[FlowShape[ResponseOutput, HttpResponse]] { - private val in = Inlet[ResponseOutput]("ResponsePrep.in") - private val out = Outlet[HttpResponse]("ResponsePrep.out") + private val in = Inlet[ResponseOutput]("PrepareResponse.in") + private val out = Outlet[HttpResponse]("PrepareResponse.out") val shape = new FlowShape(in, out) @@ -143,10 +151,15 @@ private[http] object OutgoingConnectionBlueprint { private var entitySource: SubSourceOutlet[ResponseOutput] = _ private def entitySubstreamStarted = entitySource ne null private def idle = this + private var completionDeferred = false def setIdleHandlers(): Unit = { - setHandler(in, idle) - setHandler(out, idle) + if (completionDeferred) { + completeStage() + } else { + setHandler(in, idle) + setHandler(out, idle) + } } def onPush(): Unit = grab(in) match { @@ -165,20 +178,43 @@ private[http] object OutgoingConnectionBlueprint { if (!entitySubstreamStarted) pull(in) } - setIdleHandlers() - - private lazy val waitForMessageEnd = new InHandler { - def onPush(): Unit = grab(in) match { - case MessageEnd ⇒ setHandler(in, idle) - case other ⇒ throw new IllegalStateException(s"MessageEnd expected but $other received.") + override def onDownstreamFinish(): Unit = { + // if downstream cancels while streaming entity, + // make sure we also cancel the entity source, but + // after being done with streaming the entity + if (entitySubstreamStarted) { + completionDeferred = true + } else { + completeStage() } } + setIdleHandlers() + + // with a strict message there still is a MessageEnd to wait for + lazy val waitForMessageEnd = new InHandler with OutHandler { + def onPush(): Unit = grab(in) match { + case MessageEnd ⇒ + if (isAvailable(out)) pull(in) + setIdleHandlers() + case other ⇒ throw new IllegalStateException(s"MessageEnd expected but $other received.") + } + + override def onPull(): Unit = { + // ignore pull as we will anyways pull when we get MessageEnd + } + } + + // with a streamed entity we push the chunks into the substream + // until we reach MessageEnd private lazy val substreamHandler = new InHandler with OutHandler { override def onPush(): Unit = grab(in) match { case MessageEnd ⇒ entitySource.complete() entitySource = null + // there was a deferred pull from upstream + // while we were streaming the entity + if (isAvailable(out)) pull(in) setIdleHandlers() case messagePart ⇒ @@ -196,18 +232,16 @@ private[http] object OutgoingConnectionBlueprint { entitySource.fail(reason) failStage(reason) } - - override def onDownstreamFinish(): Unit = { - entitySource.complete() - completeStage() - } } private def createEntity(creator: EntityCreator[ResponseOutput, ResponseEntity]): ResponseEntity = { creator match { case StrictEntityCreator(entity) ⇒ + // upstream demanded one element, which it just got + // but we want MessageEnd as well pull(in) setHandler(in, waitForMessageEnd) + setHandler(out, waitForMessageEnd) entity case StreamedEntityCreator(creator) ⇒ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 679cde6793..0d67ed92cc 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -96,8 +96,8 @@ private[http] object HttpServerBluePrint { * entity). */ final class PrepareRequests(settings: ServerSettings) extends GraphStage[FlowShape[RequestOutput, HttpRequest]] { - val in = Inlet[RequestOutput]("RequestStartThenRunIgnore.in") - val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out") + val in = Inlet[RequestOutput]("PrepareRequests.in") + val out = Outlet[HttpRequest]("PrepareRequests.out") override val shape: FlowShape[RequestOutput, HttpRequest] = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala index 4ef39d34ed..ea2cfea1f5 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -86,7 +86,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec with ScalaFutures { .grouped(10) .runWith(Sink.head) - a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 1.second) + a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 3.second) binding.futureValue.unbind() } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/PrepareResponseSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/PrepareResponseSpec.scala new file mode 100644 index 0000000000..89b84be2eb --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/PrepareResponseSpec.scala @@ -0,0 +1,237 @@ +/* + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.http.impl.engine.client + +import akka.http.impl.engine.client.OutgoingConnectionBlueprint.PrepareResponse +import akka.http.impl.engine.parsing.ParserOutput +import akka.http.impl.engine.parsing.ParserOutput.{ StrictEntityCreator, EntityStreamError, EntityChunk, StreamedEntityCreator } +import akka.http.scaladsl.model._ +import akka.http.scaladsl.settings.ParserSettings +import akka.stream.{ ActorMaterializer, Attributes } +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.testkit.{ TestSubscriber, TestPublisher, AkkaSpec } +import akka.util.ByteString + +class PrepareResponseSpec extends AkkaSpec { + + val parserSettings = ParserSettings(system) + + val chunkedStart = ParserOutput.ResponseStart( + StatusCodes.OK, + HttpProtocols.`HTTP/1.1`, + List(), + StreamedEntityCreator[ParserOutput, ResponseEntity] { entityChunks ⇒ + val chunks = entityChunks.collect { + case EntityChunk(chunk) ⇒ chunk + case EntityStreamError(info) ⇒ throw EntityStreamException(info) + } + HttpEntity.Chunked(ContentTypes.`application/octet-stream`, HttpEntity.limitableChunkSource(chunks)) + }, + closeRequested = false) + + val strictStart = ParserOutput.ResponseStart( + StatusCodes.OK, + HttpProtocols.`HTTP/1.1`, + List(), + StrictEntityCreator(HttpEntity("body")), + closeRequested = false) + + val chunk = ParserOutput.EntityChunk(HttpEntity.ChunkStreamPart("abc")) + + val messageEnd = ParserOutput.MessageEnd + + "The PrepareRequest stage" should { + + "not lose demand that comes in while streaming entity" in { + implicit val mat = ActorMaterializer() + + val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]() + val responseProbe = TestSubscriber.manualProbe[HttpResponse] + + Source.fromPublisher(inProbe) + .via(new PrepareResponse(parserSettings)) + .to(Sink.fromSubscriber(responseProbe)) + .withAttributes(Attributes.inputBuffer(1, 1)) + .run() + + val inSub = inProbe.expectSubscription() + val responseSub = responseProbe.expectSubscription() + + responseSub.request(1) + inSub.expectRequest(1) + inSub.sendNext(chunkedStart) + val response = responseProbe.expectNext() + + val entityProbe = TestSubscriber.manualProbe[ByteString]() + response.entity.dataBytes.to(Sink.fromSubscriber(entityProbe)).run() + val entitySub = entityProbe.expectSubscription() + + entitySub.request(1) + inSub.expectRequest(1) + inSub.sendNext(chunk) + entityProbe.expectNext() + + // now, before entity stream has completed + // there is upstream demand + responseSub.request(1) + + // then chunk completes + entitySub.request(1) + inSub.expectRequest(1) + inSub.sendNext(messageEnd) + entityProbe.expectComplete() + + // and that demand should go downstream + // since the chunk end was consumed by the stage + inSub.expectRequest(1) + + } + + "not lose demand that comes in while handling strict entity" in { + implicit val mat = ActorMaterializer() + + val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]() + val responseProbe = TestSubscriber.manualProbe[HttpResponse] + + Source.fromPublisher(inProbe) + .via(new PrepareResponse(parserSettings)) + .to(Sink.fromSubscriber(responseProbe)) + .withAttributes(Attributes.inputBuffer(1, 1)) + .run() + + val inSub = inProbe.expectSubscription() + val responseSub = responseProbe.expectSubscription() + + responseSub.request(1) + inSub.expectRequest(1) + inSub.sendNext(strictStart) + val response = responseProbe.expectNext() + + // now, before the strict message has completed + // there is upstream demand + responseSub.request(1) + + // then chunk completes + inSub.expectRequest(1) + inSub.sendNext(messageEnd) + + // and that demand should go downstream + // since the chunk end was consumed by the stage + inSub.expectRequest(1) + + } + + "complete entity stream then complete stage when downstream cancels" in { + // to make it possible to cancel a big file download for example + // without downloading the entire response first + implicit val mat = ActorMaterializer() + + val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]() + val responseProbe = TestSubscriber.manualProbe[HttpResponse] + + Source.fromPublisher(inProbe) + .via(new PrepareResponse(parserSettings)) + .to(Sink.fromSubscriber(responseProbe)) + .withAttributes(Attributes.inputBuffer(1, 1)) + .run() + + val inSub = inProbe.expectSubscription() + val responseSub = responseProbe.expectSubscription() + + responseSub.request(1) + inSub.expectRequest(1) + inSub.sendNext(chunkedStart) + val response = responseProbe.expectNext() + + val entityProbe = TestSubscriber.manualProbe[ByteString]() + response.entity.dataBytes.to(Sink.fromSubscriber(entityProbe)).run() + val entitySub = entityProbe.expectSubscription() + + entitySub.request(1) + inSub.expectRequest(1) + inSub.sendNext(chunk) + entityProbe.expectNext() + + // now before entity stream is completed, + // upstream cancels + responseSub.cancel() + + entitySub.request(1) + inSub.expectRequest(1) + inSub.sendNext(messageEnd) + + entityProbe.expectComplete() + inSub.expectCancellation() + } + + "complete stage when downstream cancels before end of strict request has arrived" in { + implicit val mat = ActorMaterializer() + + val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]() + val responseProbe = TestSubscriber.manualProbe[HttpResponse] + + Source.fromPublisher(inProbe) + .via(new PrepareResponse(parserSettings)) + .to(Sink.fromSubscriber(responseProbe)) + .withAttributes(Attributes.inputBuffer(1, 1)) + .run() + + val inSub = inProbe.expectSubscription() + val responseSub = responseProbe.expectSubscription() + + responseSub.request(1) + inSub.expectRequest(1) + inSub.sendNext(strictStart) + val response = responseProbe.expectNext() + + // now before end of message has arrived + // downstream cancels + responseSub.cancel() + + // which should cancel the stage + inSub.expectCancellation() + } + + "cancel entire stage when the entity stream is canceled" in { + implicit val mat = ActorMaterializer() + + val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]() + val responseProbe = TestSubscriber.manualProbe[HttpResponse] + + Source.fromPublisher(inProbe) + .via(new PrepareResponse(parserSettings)) + .to(Sink.fromSubscriber(responseProbe)) + .withAttributes(Attributes.inputBuffer(1, 1)) + .run() + + val inSub = inProbe.expectSubscription() + val responseSub = responseProbe.expectSubscription() + + responseSub.request(1) + inSub.expectRequest(1) + inSub.sendNext(chunkedStart) + val response = responseProbe.expectNext() + + val entityProbe = TestSubscriber.manualProbe[ByteString]() + response.entity.dataBytes.to(Sink.fromSubscriber(entityProbe)).run() + val entitySub = entityProbe.expectSubscription() + + entitySub.request(1) + inSub.expectRequest(1) + inSub.sendNext(chunk) + entityProbe.expectNext() + + // now, before entity stream has completed + // it is cancelled + entitySub.cancel() + + // this means that the entire stage should + // cancel + inSub.expectCancellation() + + } + + } + +} diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/PrepareRequestsSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/PrepareRequestsSpec.scala index 67dd188642..17678b2dcb 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/PrepareRequestsSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/PrepareRequestsSpec.scala @@ -31,8 +31,8 @@ class PrepareRequestsSpec extends AkkaSpec { } HttpEntity.Chunked(ContentTypes.`application/octet-stream`, HttpEntity.limitableChunkSource(chunks)) }, - true, - false) + expect100Continue = true, + closeRequested = false) val chunkPart = ParserOutput.EntityChunk(HttpEntity.ChunkStreamPart(ByteString("abc"))) @@ -203,6 +203,44 @@ class PrepareRequestsSpec extends AkkaSpec { upstreamProbe.expectComplete() } + + "cancel the stage when the entity stream is canceled" in { + implicit val materializer = ActorMaterializer() + + val inProbe = TestPublisher.manualProbe[ParserOutput.RequestOutput]() + val upstreamProbe = TestSubscriber.manualProbe[HttpRequest]() + + val stage = Flow.fromGraph(new PrepareRequests(ServerSettings(system))) + + Source.fromPublisher(inProbe) + .via(stage) + .to(Sink.fromSubscriber(upstreamProbe)) + .withAttributes(Attributes.inputBuffer(1, 1)) + .run() + + val upstreamSub = upstreamProbe.expectSubscription() + val inSub = inProbe.expectSubscription() + + // let request with streamed entity through + upstreamSub.request(1) + inSub.expectRequest(1) + inSub.sendNext(chunkedStart) + + val request = upstreamProbe.expectNext() + + // and subscribe to it's streamed entity + val entityProbe = TestSubscriber.manualProbe[ByteString]() + request.entity.dataBytes.to(Sink.fromSubscriber(entityProbe)) + .withAttributes(Attributes.inputBuffer(1, 1)) + .run() + + val entitySub = entityProbe.expectSubscription() + + // user logic cancels entity stream + entitySub.cancel() + + inSub.expectCancellation() + } } }