diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala index 6da97f2f42..6529d5f8a9 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala @@ -5,7 +5,7 @@ package akka.http.impl.engine.rendering import akka.parboiled2.CharUtils -import akka.stream.{Attributes, SourceShape} +import akka.stream.{ Attributes, SourceShape } import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage @@ -108,7 +108,6 @@ private object RenderSupport { override def toString = "CheckContentLength" } - private def renderChunk(chunk: HttpEntity.ChunkStreamPart): ByteString = { import chunk._ val renderedSize = // buffer space required for rendering (without trailer) diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/One2OneBidiFlow.scala b/akka-http-core/src/main/scala/akka/http/impl/util/One2OneBidiFlow.scala index 98fb679737..8b106b67e7 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/One2OneBidiFlow.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/One2OneBidiFlow.scala @@ -36,65 +36,64 @@ private[http] object One2OneBidiFlow { def apply[I, O](maxPending: Int): BidiFlow[I, I, O, O, NotUsed] = BidiFlow.fromGraph(new One2OneBidi[I, O](maxPending)) + /* + * +--------------------+ + * ~> | in toWrapped | ~> + * | | + * <~ | out fromWrapped | <~ + * +--------------------+ + */ class One2OneBidi[I, O](maxPending: Int) extends GraphStage[BidiShape[I, I, O, O]] { - val inIn = Inlet[I]("inIn") - val inOut = Outlet[I]("inOut") - val outIn = Inlet[O]("outIn") - val outOut = Outlet[O]("outOut") + val in = Inlet[I]("One2OneBidi.in") + val out = Outlet[O]("One2OneBidi.out") + val toWrapped = Outlet[I]("One2OneBidi.toWrapped") + val fromWrapped = Inlet[O]("One2OneBidi.fromWrapped") override def initialAttributes = Attributes.name("One2OneBidi") - val shape = BidiShape(inIn, inOut, outIn, outOut) + val shape = BidiShape(in, toWrapped, fromWrapped, out) override def toString = "One2OneBidi" override def createLogic(effectiveAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - private var pending = 0 + private var insideWrappedFlow = 0 private var pullSuppressed = false - // If the inner flow cancelled the upstream before the upstream finished, we still want to treat it as a truncation - // since upstream elements might possibly been lost/ignored (although we don't know for sure, since there is a - // race with the upstream completion and downstream cancellattion) - private var innerFlowCancelled = false - - setHandler(inIn, new InHandler { + setHandler(in, new InHandler { override def onPush(): Unit = { - pending += 1 - push(inOut, grab(inIn)) + insideWrappedFlow += 1 + push(toWrapped, grab(in)) } - override def onUpstreamFinish(): Unit = complete(inOut) + override def onUpstreamFinish(): Unit = complete(toWrapped) }) - setHandler(inOut, new OutHandler { + setHandler(toWrapped, new OutHandler { override def onPull(): Unit = - if (pending < maxPending || maxPending == -1) pull(inIn) + if (insideWrappedFlow < maxPending || maxPending == -1) pull(in) else pullSuppressed = true - override def onDownstreamFinish(): Unit = { - if (!isClosed(inIn)) innerFlowCancelled = true - cancel(inIn) - } + override def onDownstreamFinish(): Unit = cancel(in) }) - setHandler(outIn, new InHandler { + setHandler(fromWrapped, new InHandler { override def onPush(): Unit = { - val element = grab(outIn) - if (pending > 0) { - pending -= 1 - push(outOut, element) + val element = grab(fromWrapped) + if (insideWrappedFlow > 0) { + insideWrappedFlow -= 1 + push(out, element) if (pullSuppressed) { pullSuppressed = false - pull(inIn) + pull(in) } } else throw new UnexpectedOutputException(element) } override def onUpstreamFinish(): Unit = { - if (pending == 0 && isClosed(inIn) && !innerFlowCancelled) complete(outOut) - else throw OutputTruncationException + if (insideWrappedFlow > 0) throw OutputTruncationException + else completeStage() } }) - setHandler(outOut, new OutHandler { - override def onPull(): Unit = pull(outIn) - override def onDownstreamFinish(): Unit = cancel(outIn) + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(fromWrapped) + override def onDownstreamFinish(): Unit = cancel(fromWrapped) }) } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala index a58932e34f..0bc7cca0b7 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -4,11 +4,13 @@ package akka.http.impl.engine.client +import java.util.concurrent.CountDownLatch + import akka.http.impl.util.One2OneBidiFlow -import scala.concurrent.Await +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import akka.stream.{ ActorMaterializerSettings, FlowShape, ActorMaterializer } +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, FlowShape, OverflowStrategy } import akka.stream.scaladsl._ import akka.testkit.AkkaSpec import akka.http.scaladsl.{ Http, TestUtils } @@ -73,23 +75,5 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec { binding.futureValue.unbind() } - "catch response stream truncation" in Utils.assertAllStagesStopped { - val (_, serverHostName, serverPort) = TestUtils.temporaryServerHostnameAndPort() - - val binding = Http().bindAndHandleSync({ - case HttpRequest(_, Uri.Path("/b"), _, _, _) ⇒ HttpResponse(headers = List(headers.Connection("close"))) - case _ ⇒ HttpResponse() - }, serverHostName, serverPort) - - val x = Source(List("/a", "/b", "/c")) - .map(path ⇒ HttpRequest(uri = path)) - .via(Http().outgoingConnection(serverHostName, serverPort)) - .grouped(10) - .runWith(Sink.head) - - a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(x, 3.second) - binding.futureValue.unbind() - } - } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index 4c903c3cf7..6d2ad2af15 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -359,7 +359,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. val error @ EntityStreamException(info) = probe.expectError() info.summary shouldEqual "Illegal chunk termination" - responses.expectError() + responses.expectComplete() netOut.expectComplete() requestsSub.expectCancellation() netInSub.expectCancellation() diff --git a/akka-http-core/src/test/scala/akka/http/impl/util/One2OneBidiFlowSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/util/One2OneBidiFlowSpec.scala index 64a60f33a0..35093d7d74 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/util/One2OneBidiFlowSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/util/One2OneBidiFlowSpec.scala @@ -55,8 +55,8 @@ class One2OneBidiFlowSpec extends AkkaSpec { upstreamProbe.sendNext(1) flowInProbe.expectNext(1) - flowOutProbe.sendNext(1) - downstreamProbe.expectNext(1) + + // Request is now in the wrapped flow but no reply has been returned at this point, this is a clear truncation flowOutProbe.sendComplete() upstreamProbe.expectCancellation() @@ -64,39 +64,6 @@ class One2OneBidiFlowSpec extends AkkaSpec { downstreamProbe.expectError(One2OneBidiFlow.OutputTruncationException) } - "trigger an `OutputTruncationException` if the wrapped stream cancels early" in assertAllStagesStopped { - val flowInProbe = TestSubscriber.probe[Int]() - val flowOutProbe = TestPublisher.probe[Int]() - - val testSetup = One2OneBidiFlow[Int, Int](-1) join Flow.fromSinkAndSource( - Sink.fromSubscriber(flowInProbe), - Source.fromPublisher(flowOutProbe)) - - val upstreamProbe = TestPublisher.probe[Int]() - val downstreamProbe = TestSubscriber.probe[Int]() - - Source.fromPublisher(upstreamProbe).via(testSetup).runWith(Sink.fromSubscriber(downstreamProbe)) - - upstreamProbe.ensureSubscription() - downstreamProbe.ensureSubscription() - flowInProbe.ensureSubscription() - flowOutProbe.ensureSubscription() - - downstreamProbe.request(1) - flowInProbe.request(1) - - upstreamProbe.sendNext(1) - flowInProbe.expectNext(1) - flowOutProbe.sendNext(1) - downstreamProbe.expectNext(1) - - flowInProbe.cancel() - upstreamProbe.expectCancellation() - - flowOutProbe.sendComplete() - downstreamProbe.expectError(One2OneBidiFlow.OutputTruncationException) - } - "trigger an `UnexpectedOutputException` if the wrapped stream produces out-of-order elements" in assertAllStagesStopped { new Test() { inIn.sendNext(1) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/TightRequestTimeoutSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/TightRequestTimeoutSpec.scala index 2002db3dba..1902df0cc2 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/TightRequestTimeoutSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/TightRequestTimeoutSpec.scala @@ -59,9 +59,6 @@ class TightRequestTimeoutSpec extends WordSpec with Matchers with BeforeAndAfter val response = Http().singleRequest(HttpRequest(uri = s"http://$hostname:$port/")).futureValue response.status should ===(StatusCodes.ServiceUnavailable) // the timeout response - p.expectMsgPF(hint = "Expected truncation error") { - case Logging.Error(_, _, _, msg: String) if msg contains "Inner stream finished before inputs completed." ⇒ () - } p.expectNoMsg(1.second) // here the double push might happen binding.flatMap(_.unbind()).futureValue diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 0d6a7ec994..4915d9127d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -28,7 +28,7 @@ import scala.compat.java8.FutureConverters._ * a Reactive Streams `Publisher` (at least conceptually). */ final class Source[+Out, +Mat](private[stream] override val module: Module) - extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { + extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { override type Repr[+O] = Source[O, Mat @uncheckedVariance] override type ReprMat[+O, +M] = Source[O, M]