Merge pull request #19690 from drewhk/wip-19553-one2onebidi-cancellation-drewhk

#19533 One2OneBidi should report truncation also if wrapped flow cancels early
This commit is contained in:
drewhk 2016-02-10 12:28:12 +01:00
commit b19b94071e
4 changed files with 21 additions and 6 deletions

View file

@ -6,7 +6,7 @@ package akka.http.impl.engine.client
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.stream.{ FlowShape, ActorMaterializer } import akka.stream.{ ActorMaterializerSettings, FlowShape, ActorMaterializer }
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import akka.http.scaladsl.{ Http, TestUtils } import akka.http.scaladsl.{ Http, TestUtils }
@ -15,7 +15,7 @@ import akka.stream.testkit.Utils
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
class HighLevelOutgoingConnectionSpec extends AkkaSpec with ScalaFutures { class HighLevelOutgoingConnectionSpec extends AkkaSpec with ScalaFutures {
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withFuzzing(true))
implicit val patience = PatienceConfig(1.second) implicit val patience = PatienceConfig(1.second)
"The connection-level client implementation" should { "The connection-level client implementation" should {

View file

@ -359,7 +359,7 @@ class LowLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka.
val error @ EntityStreamException(info) = probe.expectError() val error @ EntityStreamException(info) = probe.expectError()
info.summary shouldEqual "Illegal chunk termination" info.summary shouldEqual "Illegal chunk termination"
responses.expectComplete() responses.expectError()
netOut.expectComplete() netOut.expectComplete()
requestsSub.expectCancellation() requestsSub.expectCancellation()
netInSub.expectCancellation() netInSub.expectCancellation()

View file

@ -36,6 +36,11 @@ class One2OneBidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(test(f), 1.second) a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(test(f), 1.second)
} }
"trigger an `OutputTruncationException` if the wrapped stream cancels early" in {
val f = One2OneBidiFlow[Int, Int](-1) join Flow[Int].take(2)
a[One2OneBidiFlow.OutputTruncationException.type] should be thrownBy Await.result(test(f), 1.second)
}
"trigger an `UnexpectedOutputException` if the wrapped stream produces out-of-order elements" in new Test() { "trigger an `UnexpectedOutputException` if the wrapped stream produces out-of-order elements" in new Test() {
inIn.sendNext(1) inIn.sendNext(1)
inOut.requestNext() should ===(1) inOut.requestNext() should ===(1)

View file

@ -22,7 +22,9 @@ object One2OneBidiFlow {
* consumed the respective input element. * consumed the respective input element.
* 2. triggers an `OutputTruncationException` if the inner flow completes before having produced an output element * 2. triggers an `OutputTruncationException` if the inner flow completes before having produced an output element
* for every input element. * for every input element.
* 3. Backpressures the input side if the maximum number of pending output elements has been reached, * 3. triggers an `OutputTruncationException` if the inner flow cancels its inputs before the upstream completes its
* stream of inputs.
* 4. Backpressures the input side if the maximum number of pending output elements has been reached,
* which is given via the ``maxPending`` parameter. You can use -1 to disable this feature. * which is given via the ``maxPending`` parameter. You can use -1 to disable this feature.
*/ */
def apply[I, O](maxPending: Int): BidiFlow[I, I, O, O, NotUsed] = def apply[I, O](maxPending: Int): BidiFlow[I, I, O, O, NotUsed] =
@ -43,6 +45,11 @@ object One2OneBidiFlow {
private var pending = 0 private var pending = 0
private var pullSuppressed = false private var pullSuppressed = false
// If the inner flow cancelled the upstream before the upstream finished, we still want to treat it as a truncation
// since upstream elements might possibly been lost/ignored (although we don't know for sure, since there is a
// race with the upstream completion and downstream cancellattion)
private var innerFlowCancelled = false
setHandler(inIn, new InHandler { setHandler(inIn, new InHandler {
override def onPush(): Unit = { override def onPush(): Unit = {
pending += 1 pending += 1
@ -55,7 +62,10 @@ object One2OneBidiFlow {
override def onPull(): Unit = override def onPull(): Unit =
if (pending < maxPending || maxPending == -1) pull(inIn) if (pending < maxPending || maxPending == -1) pull(inIn)
else pullSuppressed = true else pullSuppressed = true
override def onDownstreamFinish(): Unit = cancel(inIn) override def onDownstreamFinish(): Unit = {
if (!isClosed(inIn)) innerFlowCancelled = true
cancel(inIn)
}
}) })
setHandler(outIn, new InHandler { setHandler(outIn, new InHandler {
@ -71,7 +81,7 @@ object One2OneBidiFlow {
} else throw new UnexpectedOutputException(element) } else throw new UnexpectedOutputException(element)
} }
override def onUpstreamFinish(): Unit = override def onUpstreamFinish(): Unit =
if (pending == 0) complete(outOut) if (pending == 0 && !innerFlowCancelled) complete(outOut)
else throw OutputTruncationException else throw OutputTruncationException
}) })