=htc #19741 cannot pull twice bug in OutgoingConnectionBlueprint fixed

This commit is contained in:
Johan Andrén 2016-02-10 15:13:51 +01:00
parent f042204d8b
commit dd3c9c4a97
5 changed files with 331 additions and 22 deletions

View file

@ -69,7 +69,7 @@ private[http] object OutgoingConnectionBlueprint {
import ParserOutput._
val responsePrep = Flow[List[ResponseOutput]]
.mapConcat(conforms)
.via(new ResponsePrep(parserSettings))
.via(new PrepareResponse(parserSettings))
val core = BidiFlow.fromGraph(GraphDSL.create() { implicit b
import GraphDSL.Implicits._
@ -131,11 +131,19 @@ private[http] object OutgoingConnectionBlueprint {
import ParserOutput._
private final class ResponsePrep(parserSettings: ParserSettings)
/**
* This is essentially a three state state machine, it is either 'idle' - waiting for a response to come in
* or has seen the start of a response and is waiting for either chunks followed by MessageEnd if chunked
* or just MessageEnd in the case of a strict response.
*
* For chunked responses a new substream into the response entity is opened and data is streamed there instead
* of downstream until end of chunks has been reached.
*/
private[client] final class PrepareResponse(parserSettings: ParserSettings)
extends GraphStage[FlowShape[ResponseOutput, HttpResponse]] {
private val in = Inlet[ResponseOutput]("ResponsePrep.in")
private val out = Outlet[HttpResponse]("ResponsePrep.out")
private val in = Inlet[ResponseOutput]("PrepareResponse.in")
private val out = Outlet[HttpResponse]("PrepareResponse.out")
val shape = new FlowShape(in, out)
@ -143,10 +151,15 @@ private[http] object OutgoingConnectionBlueprint {
private var entitySource: SubSourceOutlet[ResponseOutput] = _
private def entitySubstreamStarted = entitySource ne null
private def idle = this
private var completionDeferred = false
def setIdleHandlers(): Unit = {
setHandler(in, idle)
setHandler(out, idle)
if (completionDeferred) {
completeStage()
} else {
setHandler(in, idle)
setHandler(out, idle)
}
}
def onPush(): Unit = grab(in) match {
@ -165,20 +178,43 @@ private[http] object OutgoingConnectionBlueprint {
if (!entitySubstreamStarted) pull(in)
}
setIdleHandlers()
private lazy val waitForMessageEnd = new InHandler {
def onPush(): Unit = grab(in) match {
case MessageEnd setHandler(in, idle)
case other throw new IllegalStateException(s"MessageEnd expected but $other received.")
override def onDownstreamFinish(): Unit = {
// if downstream cancels while streaming entity,
// make sure we also cancel the entity source, but
// after being done with streaming the entity
if (entitySubstreamStarted) {
completionDeferred = true
} else {
completeStage()
}
}
setIdleHandlers()
// with a strict message there still is a MessageEnd to wait for
lazy val waitForMessageEnd = new InHandler with OutHandler {
def onPush(): Unit = grab(in) match {
case MessageEnd
if (isAvailable(out)) pull(in)
setIdleHandlers()
case other throw new IllegalStateException(s"MessageEnd expected but $other received.")
}
override def onPull(): Unit = {
// ignore pull as we will anyways pull when we get MessageEnd
}
}
// with a streamed entity we push the chunks into the substream
// until we reach MessageEnd
private lazy val substreamHandler = new InHandler with OutHandler {
override def onPush(): Unit = grab(in) match {
case MessageEnd
entitySource.complete()
entitySource = null
// there was a deferred pull from upstream
// while we were streaming the entity
if (isAvailable(out)) pull(in)
setIdleHandlers()
case messagePart
@ -196,18 +232,16 @@ private[http] object OutgoingConnectionBlueprint {
entitySource.fail(reason)
failStage(reason)
}
override def onDownstreamFinish(): Unit = {
entitySource.complete()
completeStage()
}
}
private def createEntity(creator: EntityCreator[ResponseOutput, ResponseEntity]): ResponseEntity = {
creator match {
case StrictEntityCreator(entity)
// upstream demanded one element, which it just got
// but we want MessageEnd as well
pull(in)
setHandler(in, waitForMessageEnd)
setHandler(out, waitForMessageEnd)
entity
case StreamedEntityCreator(creator)

View file

@ -96,8 +96,8 @@ private[http] object HttpServerBluePrint {
* entity).
*/
final class PrepareRequests(settings: ServerSettings) extends GraphStage[FlowShape[RequestOutput, HttpRequest]] {
val in = Inlet[RequestOutput]("RequestStartThenRunIgnore.in")
val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out")
val in = Inlet[RequestOutput]("PrepareRequests.in")
val out = Outlet[HttpRequest]("PrepareRequests.out")
override val shape: FlowShape[RequestOutput, HttpRequest] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {

View file

@ -86,7 +86,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec with ScalaFutures {
.grouped(10)
.runWith(Sink.head)
a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 1.second)
a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 3.second)
binding.futureValue.unbind()
}
}

View file

@ -0,0 +1,237 @@
/*
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.impl.engine.client
import akka.http.impl.engine.client.OutgoingConnectionBlueprint.PrepareResponse
import akka.http.impl.engine.parsing.ParserOutput
import akka.http.impl.engine.parsing.ParserOutput.{ StrictEntityCreator, EntityStreamError, EntityChunk, StreamedEntityCreator }
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ParserSettings
import akka.stream.{ ActorMaterializer, Attributes }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.{ TestSubscriber, TestPublisher, AkkaSpec }
import akka.util.ByteString
class PrepareResponseSpec extends AkkaSpec {
val parserSettings = ParserSettings(system)
val chunkedStart = ParserOutput.ResponseStart(
StatusCodes.OK,
HttpProtocols.`HTTP/1.1`,
List(),
StreamedEntityCreator[ParserOutput, ResponseEntity] { entityChunks
val chunks = entityChunks.collect {
case EntityChunk(chunk) chunk
case EntityStreamError(info) throw EntityStreamException(info)
}
HttpEntity.Chunked(ContentTypes.`application/octet-stream`, HttpEntity.limitableChunkSource(chunks))
},
closeRequested = false)
val strictStart = ParserOutput.ResponseStart(
StatusCodes.OK,
HttpProtocols.`HTTP/1.1`,
List(),
StrictEntityCreator(HttpEntity("body")),
closeRequested = false)
val chunk = ParserOutput.EntityChunk(HttpEntity.ChunkStreamPart("abc"))
val messageEnd = ParserOutput.MessageEnd
"The PrepareRequest stage" should {
"not lose demand that comes in while streaming entity" in {
implicit val mat = ActorMaterializer()
val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]()
val responseProbe = TestSubscriber.manualProbe[HttpResponse]
Source.fromPublisher(inProbe)
.via(new PrepareResponse(parserSettings))
.to(Sink.fromSubscriber(responseProbe))
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val inSub = inProbe.expectSubscription()
val responseSub = responseProbe.expectSubscription()
responseSub.request(1)
inSub.expectRequest(1)
inSub.sendNext(chunkedStart)
val response = responseProbe.expectNext()
val entityProbe = TestSubscriber.manualProbe[ByteString]()
response.entity.dataBytes.to(Sink.fromSubscriber(entityProbe)).run()
val entitySub = entityProbe.expectSubscription()
entitySub.request(1)
inSub.expectRequest(1)
inSub.sendNext(chunk)
entityProbe.expectNext()
// now, before entity stream has completed
// there is upstream demand
responseSub.request(1)
// then chunk completes
entitySub.request(1)
inSub.expectRequest(1)
inSub.sendNext(messageEnd)
entityProbe.expectComplete()
// and that demand should go downstream
// since the chunk end was consumed by the stage
inSub.expectRequest(1)
}
"not lose demand that comes in while handling strict entity" in {
implicit val mat = ActorMaterializer()
val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]()
val responseProbe = TestSubscriber.manualProbe[HttpResponse]
Source.fromPublisher(inProbe)
.via(new PrepareResponse(parserSettings))
.to(Sink.fromSubscriber(responseProbe))
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val inSub = inProbe.expectSubscription()
val responseSub = responseProbe.expectSubscription()
responseSub.request(1)
inSub.expectRequest(1)
inSub.sendNext(strictStart)
val response = responseProbe.expectNext()
// now, before the strict message has completed
// there is upstream demand
responseSub.request(1)
// then chunk completes
inSub.expectRequest(1)
inSub.sendNext(messageEnd)
// and that demand should go downstream
// since the chunk end was consumed by the stage
inSub.expectRequest(1)
}
"complete entity stream then complete stage when downstream cancels" in {
// to make it possible to cancel a big file download for example
// without downloading the entire response first
implicit val mat = ActorMaterializer()
val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]()
val responseProbe = TestSubscriber.manualProbe[HttpResponse]
Source.fromPublisher(inProbe)
.via(new PrepareResponse(parserSettings))
.to(Sink.fromSubscriber(responseProbe))
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val inSub = inProbe.expectSubscription()
val responseSub = responseProbe.expectSubscription()
responseSub.request(1)
inSub.expectRequest(1)
inSub.sendNext(chunkedStart)
val response = responseProbe.expectNext()
val entityProbe = TestSubscriber.manualProbe[ByteString]()
response.entity.dataBytes.to(Sink.fromSubscriber(entityProbe)).run()
val entitySub = entityProbe.expectSubscription()
entitySub.request(1)
inSub.expectRequest(1)
inSub.sendNext(chunk)
entityProbe.expectNext()
// now before entity stream is completed,
// upstream cancels
responseSub.cancel()
entitySub.request(1)
inSub.expectRequest(1)
inSub.sendNext(messageEnd)
entityProbe.expectComplete()
inSub.expectCancellation()
}
"complete stage when downstream cancels before end of strict request has arrived" in {
implicit val mat = ActorMaterializer()
val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]()
val responseProbe = TestSubscriber.manualProbe[HttpResponse]
Source.fromPublisher(inProbe)
.via(new PrepareResponse(parserSettings))
.to(Sink.fromSubscriber(responseProbe))
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val inSub = inProbe.expectSubscription()
val responseSub = responseProbe.expectSubscription()
responseSub.request(1)
inSub.expectRequest(1)
inSub.sendNext(strictStart)
val response = responseProbe.expectNext()
// now before end of message has arrived
// downstream cancels
responseSub.cancel()
// which should cancel the stage
inSub.expectCancellation()
}
"cancel entire stage when the entity stream is canceled" in {
implicit val mat = ActorMaterializer()
val inProbe = TestPublisher.manualProbe[ParserOutput.ResponseOutput]()
val responseProbe = TestSubscriber.manualProbe[HttpResponse]
Source.fromPublisher(inProbe)
.via(new PrepareResponse(parserSettings))
.to(Sink.fromSubscriber(responseProbe))
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val inSub = inProbe.expectSubscription()
val responseSub = responseProbe.expectSubscription()
responseSub.request(1)
inSub.expectRequest(1)
inSub.sendNext(chunkedStart)
val response = responseProbe.expectNext()
val entityProbe = TestSubscriber.manualProbe[ByteString]()
response.entity.dataBytes.to(Sink.fromSubscriber(entityProbe)).run()
val entitySub = entityProbe.expectSubscription()
entitySub.request(1)
inSub.expectRequest(1)
inSub.sendNext(chunk)
entityProbe.expectNext()
// now, before entity stream has completed
// it is cancelled
entitySub.cancel()
// this means that the entire stage should
// cancel
inSub.expectCancellation()
}
}
}

View file

@ -31,8 +31,8 @@ class PrepareRequestsSpec extends AkkaSpec {
}
HttpEntity.Chunked(ContentTypes.`application/octet-stream`, HttpEntity.limitableChunkSource(chunks))
},
true,
false)
expect100Continue = true,
closeRequested = false)
val chunkPart =
ParserOutput.EntityChunk(HttpEntity.ChunkStreamPart(ByteString("abc")))
@ -203,6 +203,44 @@ class PrepareRequestsSpec extends AkkaSpec {
upstreamProbe.expectComplete()
}
"cancel the stage when the entity stream is canceled" in {
implicit val materializer = ActorMaterializer()
val inProbe = TestPublisher.manualProbe[ParserOutput.RequestOutput]()
val upstreamProbe = TestSubscriber.manualProbe[HttpRequest]()
val stage = Flow.fromGraph(new PrepareRequests(ServerSettings(system)))
Source.fromPublisher(inProbe)
.via(stage)
.to(Sink.fromSubscriber(upstreamProbe))
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val upstreamSub = upstreamProbe.expectSubscription()
val inSub = inProbe.expectSubscription()
// let request with streamed entity through
upstreamSub.request(1)
inSub.expectRequest(1)
inSub.sendNext(chunkedStart)
val request = upstreamProbe.expectNext()
// and subscribe to it's streamed entity
val entityProbe = TestSubscriber.manualProbe[ByteString]()
request.entity.dataBytes.to(Sink.fromSubscriber(entityProbe))
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
val entitySub = entityProbe.expectSubscription()
// user logic cancels entity stream
entitySub.cancel()
inSub.expectCancellation()
}
}
}