From d6bf00312765c70b2a3bede33c1cd8a740dceb06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Mon, 27 Feb 2017 17:13:02 +0100 Subject: [PATCH] Provide materialized value even if downstream cancels. --- .../stream/scaladsl/FutureFlattenSourceSpec.scala | 7 +++++-- .../scala/akka/stream/impl/fusing/GraphStages.scala | 12 +++++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala index ddb15501d1..810150a2dd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala @@ -51,7 +51,7 @@ class FutureFlattenSourceSpec extends StreamSpec { } "handle downstream cancelling before the underlying Future completes" in assertAllStagesStopped { - val sourcePromise = Promise[Source[Int, Int]]() + val sourcePromise = Promise[Source[Int, String]]() val (sourceMatVal, termination) = Source.fromFutureSource(sourcePromise.future) @@ -61,7 +61,10 @@ class FutureFlattenSourceSpec extends StreamSpec { // wait for cancellation to occur termination.futureValue should ===(Done) - sourceMatVal.failed.futureValue.getMessage should ===("Downstream cancelled before future source completed") + + // even though canceled the underlying matval should arrive + sourcePromise.success(underlying) + sourceMatVal.futureValue should ===("foo") } "fail if the underlying Future is failed" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index d52279fe81..dc70e77d92 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -303,7 +303,15 @@ object GraphStages { def onPull(): Unit = {} override def onDownstreamFinish(): Unit = { - materialized.tryFailure(new RuntimeException("Downstream cancelled before future source completed")) + if (!materialized.isCompleted) { + // make sure we always yield the matval if possible, even if downstream cancelled + // before the source was materialized + val matValFuture = future.map { gr ⇒ + val runnable = Source.fromGraph(gr).to(Sink.ignore) + interpreter.subFusingMaterializer.materialize(runnable, initialAttributes = attr) + }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + materialized.completeWith(matValFuture) + } super.onDownstreamFinish() } }) @@ -320,8 +328,6 @@ object GraphStages { completeStage() override def postStop(): Unit = { - // I don't think this can happen, but just to be sure we don't leave the matval promise unfulfilled - materialized.tryFailure(new RuntimeException("FutureFlattenSource stage stopped without materialization of inner source completing")) if (!sinkIn.isClosed) sinkIn.cancel() }