No submaterializing inner source when outer stream already canceled (#23723)
* Do not try to submaterialize inner source when outer stream already cancelled #23656 * Use StreamDetachedException * Sleep a little * Updated exception text
This commit is contained in:
parent
edcc2b2d75
commit
6db974b6f4
5 changed files with 29 additions and 20 deletions
|
|
@ -81,18 +81,25 @@ class FutureFlattenSourceSpec extends StreamSpec {
|
||||||
"handle downstream cancelling before the underlying Future completes" in assertAllStagesStopped {
|
"handle downstream cancelling before the underlying Future completes" in assertAllStagesStopped {
|
||||||
val sourcePromise = Promise[Source[Int, String]]()
|
val sourcePromise = Promise[Source[Int, String]]()
|
||||||
|
|
||||||
val (sourceMatVal, termination) =
|
val probe = TestSubscriber.probe[Int]()
|
||||||
|
val sourceMatVal =
|
||||||
Source.fromFutureSource(sourcePromise.future)
|
Source.fromFutureSource(sourcePromise.future)
|
||||||
.watchTermination()(Keep.both)
|
.toMat(Sink.fromSubscriber(probe))(Keep.left)
|
||||||
.to(Sink.cancelled)
|
|
||||||
.run()
|
.run()
|
||||||
|
|
||||||
// wait for cancellation to occur
|
// wait for cancellation to occur
|
||||||
termination.futureValue should ===(Done)
|
probe.ensureSubscription()
|
||||||
|
probe.request(1)
|
||||||
|
probe.cancel()
|
||||||
|
|
||||||
|
// try to avoid a race between probe cancel and completing the promise
|
||||||
|
Thread.sleep(100)
|
||||||
|
|
||||||
// even though canceled the underlying matval should arrive
|
// even though canceled the underlying matval should arrive
|
||||||
sourcePromise.success(underlying)
|
sourcePromise.success(underlying)
|
||||||
sourceMatVal.futureValue should ===("foo")
|
val failure = sourceMatVal.failed.futureValue
|
||||||
|
failure shouldBe a[StreamDetachedException]
|
||||||
|
failure.getMessage should ===("Stream cancelled before Source Future completed")
|
||||||
}
|
}
|
||||||
|
|
||||||
"fail if the underlying Future is failed" in assertAllStagesStopped {
|
"fail if the underlying Future is failed" in assertAllStagesStopped {
|
||||||
|
|
|
||||||
|
|
@ -14,5 +14,4 @@ final class StreamDetachedException(message: String)
|
||||||
with NoStackTrace {
|
with NoStackTrace {
|
||||||
|
|
||||||
def this() = this("Stream is terminated. Materialized value is detached.")
|
def this() = this("Stream is terminated. Materialized value is detached.")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import scala.annotation.unchecked.uncheckedVariance
|
||||||
import scala.util.{ Failure, Success, Try }
|
import scala.util.{ Failure, Success, Try }
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -319,15 +320,11 @@ import scala.concurrent.{ Future, Promise }
|
||||||
|
|
||||||
override def onDownstreamFinish(): Unit = {
|
override def onDownstreamFinish(): Unit = {
|
||||||
if (!materialized.isCompleted) {
|
if (!materialized.isCompleted) {
|
||||||
// make sure we always yield the matval if possible, even if downstream cancelled
|
// we used to try to materialize the "inner" source here just to get
|
||||||
// before the source was materialized
|
// the materialized value, but that is not safe and may cause the graph shell
|
||||||
val matValFuture = futureSource.map { gr ⇒
|
// to leak/stay alive after the stage completes
|
||||||
// downstream finish means it cancelled, so we push that signal through into the future materialized source
|
|
||||||
Source.fromGraph(gr).to(Sink.cancelled)
|
materialized.tryFailure(new StreamDetachedException("Stream cancelled before Source Future completed"))
|
||||||
.withAttributes(attr)
|
|
||||||
.run()(subFusingMaterializer)
|
|
||||||
}(ExecutionContexts.sameThreadExecutionContext)
|
|
||||||
materialized.completeWith(matValFuture)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
super.onDownstreamFinish()
|
super.onDownstreamFinish()
|
||||||
|
|
|
||||||
|
|
@ -182,13 +182,16 @@ object Source {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Streams the elements of the given future source once it successfully completes.
|
* Streams the elements of the given future source once it successfully completes.
|
||||||
* If the future fails the stream is failed.
|
* If the [[Future]] fails the stream is failed with the exception from the future. If downstream cancels before the
|
||||||
|
* stream completes the materialized [[Future]] will be failed with a [[StreamDetachedException]].
|
||||||
*/
|
*/
|
||||||
def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
|
def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Streams the elements of an asynchronous source once its given `completion` stage completes.
|
* Streams the elements of an asynchronous source once its given [[CompletionStage]] completes.
|
||||||
* If the `completion` fails the stream is failed with that exception.
|
* If the [[CompletionStage]] fails the stream is failed with the exception from the future.
|
||||||
|
* If downstream cancels before the stream completes the materialized [[CompletionStage]] will be failed
|
||||||
|
* with a [[StreamDetachedException]]
|
||||||
*/
|
*/
|
||||||
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] =
|
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] =
|
||||||
new Source(scaladsl.Source.fromSourceCompletionStage(completion))
|
new Source(scaladsl.Source.fromSourceCompletionStage(completion))
|
||||||
|
|
|
||||||
|
|
@ -276,13 +276,16 @@ object Source {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Streams the elements of the given future source once it successfully completes.
|
* Streams the elements of the given future source once it successfully completes.
|
||||||
* If the future fails the stream is failed.
|
* If the [[Future]] fails the stream is failed with the exception from the future. If downstream cancels before the
|
||||||
|
* stream completes the materialized `Future` will be failed with a [[StreamDetachedException]]
|
||||||
*/
|
*/
|
||||||
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]] = fromGraph(new FutureFlattenSource(future))
|
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]] = fromGraph(new FutureFlattenSource(future))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Streams the elements of an asynchronous source once its given `completion` stage completes.
|
* Streams the elements of an asynchronous source once its given `completion` stage completes.
|
||||||
* If the `completion` fails the stream is failed with that exception.
|
* If the [[CompletionStage]] fails the stream is failed with the exception from the future.
|
||||||
|
* If downstream cancels before the stream completes the materialized `Future` will be failed
|
||||||
|
* with a [[StreamDetachedException]]
|
||||||
*/
|
*/
|
||||||
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava)
|
def fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue