diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala index b1fd45aeb3..7a28d9f574 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala @@ -81,18 +81,25 @@ class FutureFlattenSourceSpec extends StreamSpec { "handle downstream cancelling before the underlying Future completes" in assertAllStagesStopped { val sourcePromise = Promise[Source[Int, String]]() - val (sourceMatVal, termination) = + val probe = TestSubscriber.probe[Int]() + val sourceMatVal = Source.fromFutureSource(sourcePromise.future) - .watchTermination()(Keep.both) - .to(Sink.cancelled) + .toMat(Sink.fromSubscriber(probe))(Keep.left) .run() // wait for cancellation to occur - termination.futureValue should ===(Done) + probe.ensureSubscription() + probe.request(1) + probe.cancel() + + // try to avoid a race between probe cancel and completing the promise + Thread.sleep(100) // even though canceled the underlying matval should arrive sourcePromise.success(underlying) - sourceMatVal.futureValue should ===("foo") + val failure = sourceMatVal.failed.futureValue + failure shouldBe a[StreamDetachedException] + failure.getMessage should ===("Stream cancelled before Source Future completed") } "fail if the underlying Future is failed" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/StreamDetachedException.scala b/akka-stream/src/main/scala/akka/stream/StreamDetachedException.scala index 0fb53554e7..2b587860ef 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamDetachedException.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamDetachedException.scala @@ -14,5 +14,4 @@ final class StreamDetachedException(message: String) with NoStackTrace { def this() = this("Stream is terminated. Materialized value is detached.") - } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 586a6f5055..be12d996f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -22,6 +22,7 @@ import scala.annotation.unchecked.uncheckedVariance import scala.util.{ Failure, Success, Try } import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } +import scala.util.control.NonFatal /** * INTERNAL API @@ -319,15 +320,11 @@ import scala.concurrent.{ Future, Promise } override def onDownstreamFinish(): Unit = { if (!materialized.isCompleted) { - // make sure we always yield the matval if possible, even if downstream cancelled - // before the source was materialized - val matValFuture = futureSource.map { gr ⇒ - // downstream finish means it cancelled, so we push that signal through into the future materialized source - Source.fromGraph(gr).to(Sink.cancelled) - .withAttributes(attr) - .run()(subFusingMaterializer) - }(ExecutionContexts.sameThreadExecutionContext) - materialized.completeWith(matValFuture) + // we used to try to materialize the "inner" source here just to get + // the materialized value, but that is not safe and may cause the graph shell + // to leak/stay alive after the stage completes + + materialized.tryFailure(new StreamDetachedException("Stream cancelled before Source Future completed")) } super.onDownstreamFinish() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 607b2f1031..4208e7e133 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -182,13 +182,16 @@ object Source { /** * Streams the elements of the given future source once it successfully completes. - * If the future fails the stream is failed. + * If the [[Future]] fails the stream is failed with the exception from the future. If downstream cancels before the + * stream completes the materialized [[Future]] will be failed with a [[StreamDetachedException]]. */ def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) /** - * Streams the elements of an asynchronous source once its given `completion` stage completes. - * If the `completion` fails the stream is failed with that exception. + * Streams the elements of an asynchronous source once its given [[CompletionStage]] completes. + * If the [[CompletionStage]] fails the stream is failed with the exception from the future. + * If downstream cancels before the stream completes the materialized [[CompletionStage]] will be failed + * with a [[StreamDetachedException]] */ def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = new Source(scaladsl.Source.fromSourceCompletionStage(completion)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5de428a57c..c2a3521fb6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -276,13 +276,16 @@ object Source { /** * Streams the elements of the given future source once it successfully completes. - * If the future fails the stream is failed. + * If the [[Future]] fails the stream is failed with the exception from the future. If downstream cancels before the + * stream completes the materialized `Future` will be failed with a [[StreamDetachedException]] */ def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]] = fromGraph(new FutureFlattenSource(future)) /** * Streams the elements of an asynchronous source once its given `completion` stage completes. - * If the `completion` fails the stream is failed with that exception. + * If the [[CompletionStage]] fails the stream is failed with the exception from the future. + * If downstream cancels before the stream completes the materialized `Future` will be failed + * with a [[StreamDetachedException]] */ def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava)