Provide materialized value even if downstream cancels.

This commit is contained in:
Johan Andrén 2017-02-27 17:13:02 +01:00
parent 2dc7fcd651
commit d6bf003127
2 changed files with 14 additions and 5 deletions

View file

@ -51,7 +51,7 @@ class FutureFlattenSourceSpec extends StreamSpec {
}
"handle downstream cancelling before the underlying Future completes" in assertAllStagesStopped {
val sourcePromise = Promise[Source[Int, Int]]()
val sourcePromise = Promise[Source[Int, String]]()
val (sourceMatVal, termination) =
Source.fromFutureSource(sourcePromise.future)
@ -61,7 +61,10 @@ class FutureFlattenSourceSpec extends StreamSpec {
// wait for cancellation to occur
termination.futureValue should ===(Done)
sourceMatVal.failed.futureValue.getMessage should ===("Downstream cancelled before future source completed")
// even though canceled the underlying matval should arrive
sourcePromise.success(underlying)
sourceMatVal.futureValue should ===("foo")
}
"fail if the underlying Future is failed" in assertAllStagesStopped {

View file

@ -303,7 +303,15 @@ object GraphStages {
def onPull(): Unit = {}
override def onDownstreamFinish(): Unit = {
materialized.tryFailure(new RuntimeException("Downstream cancelled before future source completed"))
if (!materialized.isCompleted) {
// make sure we always yield the matval if possible, even if downstream cancelled
// before the source was materialized
val matValFuture = future.map { gr
val runnable = Source.fromGraph(gr).to(Sink.ignore)
interpreter.subFusingMaterializer.materialize(runnable, initialAttributes = attr)
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
materialized.completeWith(matValFuture)
}
super.onDownstreamFinish()
}
})
@ -320,8 +328,6 @@ object GraphStages {
completeStage()
override def postStop(): Unit = {
// I don't think this can happen, but just to be sure we don't leave the matval promise unfulfilled
materialized.tryFailure(new RuntimeException("FutureFlattenSource stage stopped without materialization of inner source completing"))
if (!sinkIn.isClosed) sinkIn.cancel()
}