diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala index b553f2bb7b..4ef39d34ed 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -6,7 +6,7 @@ package akka.http.impl.engine.client import scala.concurrent.Await import scala.concurrent.duration._ -import akka.stream.{ FlowShape, ActorMaterializer } +import akka.stream.{ ActorMaterializerSettings, FlowShape, ActorMaterializer } import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec import akka.http.scaladsl.{ Http, TestUtils } @@ -15,7 +15,7 @@ import akka.stream.testkit.Utils import org.scalatest.concurrent.ScalaFutures class HighLevelOutgoingConnectionSpec extends AkkaSpec with ScalaFutures { - implicit val materializer = ActorMaterializer() + implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withFuzzing(true)) implicit val patience = PatienceConfig(1.second) "The connection-level client implementation" should { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala index 01ba290723..6e53fdda39 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/LowLevelOutgoingConnectionSpec.scala @@ -359,7 +359,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka. val error @ EntityStreamException(info) = probe.expectError() info.summary shouldEqual "Illegal chunk termination" - responses.expectComplete() + responses.expectError() netOut.expectComplete() requestsSub.expectCancellation() netInSub.expectCancellation() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/One2OneBidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/One2OneBidiFlowSpec.scala index 1c3881d2ab..6d876b184b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/One2OneBidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/One2OneBidiFlowSpec.scala @@ -36,6 +36,11 @@ class One2OneBidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(test(f), 1.second) } + "trigger an `OutputTruncationException` if the wrapped stream cancels early" in { + val f = One2OneBidiFlow[Int, Int](-1) join Flow[Int].take(2) + a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(test(f), 1.second) + } + "trigger an `UnexpectedOutputException` if the wrapped stream produces out-of-order elements" in new Test() { inIn.sendNext(1) inOut.requestNext() should ===(1) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/One2OneBidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/One2OneBidiFlow.scala index d78e1dba00..30516d8ad1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/One2OneBidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/One2OneBidiFlow.scala @@ -22,7 +22,9 @@ object One2OneBidiFlow { * consumed the respective input element. * 2. triggers an `OutputTruncationException` if the inner flow completes before having produced an output element * for every input element. - * 3. Backpressures the input side if the maximum number of pending output elements has been reached, + * 3. triggers an `OutputTruncationException` if the inner flow cancels its inputs before the upstream completes its + * stream of inputs. + * 4. Backpressures the input side if the maximum number of pending output elements has been reached, * which is given via the ``maxPending`` parameter. You can use -1 to disable this feature. */ def apply[I, O](maxPending: Int): BidiFlow[I, I, O, O, NotUsed] = @@ -43,6 +45,11 @@ object One2OneBidiFlow { private var pending = 0 private var pullSuppressed = false + // If the inner flow cancelled the upstream before the upstream finished, we still want to treat it as a truncation + // since upstream elements might possibly been lost/ignored (although we don't know for sure, since there is a + // race with the upstream completion and downstream cancellattion) + private var innerFlowCancelled = false + setHandler(inIn, new InHandler { override def onPush(): Unit = { pending += 1 @@ -55,7 +62,10 @@ object One2OneBidiFlow { override def onPull(): Unit = if (pending < maxPending || maxPending == -1) pull(inIn) else pullSuppressed = true - override def onDownstreamFinish(): Unit = cancel(inIn) + override def onDownstreamFinish(): Unit = { + if (!isClosed(inIn)) innerFlowCancelled = true + cancel(inIn) + } }) setHandler(outIn, new InHandler { @@ -71,7 +81,7 @@ object One2OneBidiFlow { } else throw new UnexpectedOutputException(element) } override def onUpstreamFinish(): Unit = - if (pending == 0) complete(outOut) + if (pending == 0 && !innerFlowCancelled) complete(outOut) else throw OutputTruncationException })