#19553 One2OneBidi should report truncation also if wrapped flow cancels early

This commit is contained in:
Endre Sándor Varga 2016-02-04 12:40:29 +01:00
parent 0170280fc0
commit 8e37ff42f5
4 changed files with 21 additions and 6 deletions

View file

@ -6,7 +6,7 @@ package akka.http.impl.engine.client
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.{ FlowShape, ActorMaterializer }
import akka.stream.{ ActorMaterializerSettings, FlowShape, ActorMaterializer }
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import akka.http.scaladsl.{ Http, TestUtils }
@ -15,7 +15,7 @@ import akka.stream.testkit.Utils
import org.scalatest.concurrent.ScalaFutures
class HighLevelOutgoingConnectionSpec extends AkkaSpec with ScalaFutures {
implicit val materializer = ActorMaterializer()
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withFuzzing(true))
implicit val patience = PatienceConfig(1.second)
"The connection-level client implementation" should {

View file

@ -288,7 +288,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.
val error @ EntityStreamException(info) = probe.expectError()
info.summary shouldEqual "Illegal chunk termination"
responses.expectComplete()
responses.expectError()
netOut.expectComplete()
requestsSub.expectCancellation()
netInSub.expectCancellation()

View file

@ -36,6 +36,11 @@ class One2OneBidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(test(f), 1.second)
}
"trigger an `OutputTruncationException` if the wrapped stream cancels early" in {
val f = One2OneBidiFlow[Int, Int](-1) join Flow[Int].take(2)
a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(test(f), 1.second)
}
"trigger an `UnexpectedOutputException` if the wrapped stream produces out-of-order elements" in new Test() {
inIn.sendNext(1)
inOut.requestNext() should ===(1)

View file

@ -22,7 +22,9 @@ object One2OneBidiFlow {
* consumed the respective input element.
* 2. triggers an `OutputTruncationException` if the inner flow completes before having produced an output element
* for every input element.
* 3. Backpressures the input side if the maximum number of pending output elements has been reached,
* 3. triggers an `OutputTruncationException` if the inner flow cancels its inputs before the upstream completes its
* stream of inputs.
* 4. Backpressures the input side if the maximum number of pending output elements has been reached,
* which is given via the ``maxPending`` parameter. You can use -1 to disable this feature.
*/
def apply[I, O](maxPending: Int): BidiFlow[I, I, O, O, NotUsed] =
@ -43,6 +45,11 @@ object One2OneBidiFlow {
private var pending = 0
private var pullSuppressed = false
// If the inner flow cancelled the upstream before the upstream finished, we still want to treat it as a truncation
// since upstream elements might possibly been lost/ignored (although we don't know for sure, since there is a
// race with the upstream completion and downstream cancellattion)
private var innerFlowCancelled = false
setHandler(inIn, new InHandler {
override def onPush(): Unit = {
pending += 1
@ -55,7 +62,10 @@ object One2OneBidiFlow {
override def onPull(): Unit =
if (pending < maxPending || maxPending == -1) pull(inIn)
else pullSuppressed = true
override def onDownstreamFinish(): Unit = cancel(inIn)
override def onDownstreamFinish(): Unit = {
if (!isClosed(inIn)) innerFlowCancelled = true
cancel(inIn)
}
})
setHandler(outIn, new InHandler {
@ -71,7 +81,7 @@ object One2OneBidiFlow {
} else throw new UnexpectedOutputException(element)
}
override def onUpstreamFinish(): Unit =
if (pending == 0) complete(outOut)
if (pending == 0 && !innerFlowCancelled) complete(outOut)
else throw OutputTruncationException
})