diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/futureFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/futureFlow.md index f9f30da3f2..55a2ba4b5f 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/futureFlow.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/futureFlow.md @@ -36,5 +36,9 @@ Scala **completes** when upstream completes and all futures have been completed and all elements have been emitted +**cancels** when downstream cancels (keep reading) + The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md index 02c995ea39..f4abc65522 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md @@ -29,5 +29,9 @@ and failed with a `akka.stream.NeverMaterializedException` if the stream fails o **completes** when upstream completes and all futures have been completed and all elements have been emitted +**cancels** when downstream cancels (keep reading) + The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/lazyFutureFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFutureFlow.md index e383b116c9..fe8841464f 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/lazyFutureFlow.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFutureFlow.md @@ -35,5 +35,9 @@ See @ref:[lazyFlow](lazyFlow.md) for sample. **completes** when upstream completes and all futures have been completed and all elements have been emitted +**cancels** when downstream cancels (keep reading) + The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/lazyInitAsync.md b/akka-docs/src/main/paradox/stream/operators/Flow/lazyInitAsync.md index 5e147c4c35..2dd60ca08a 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/lazyInitAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/lazyInitAsync.md @@ -26,5 +26,9 @@ Defers creation until a first element arrives. **completes** when upstream completes and all futures have been completed and all elements have been emitted +**cancels** when downstream cancels (keep reading) + The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). @@@ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala index 0fc84c31ae..cbc39b20da 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlatMapPrefixSpec.scala @@ -8,6 +8,7 @@ import akka.{ Done, NotUsed } import akka.stream.{ AbruptStageTerminationException, AbruptTerminationException, + Attributes, Materializer, NeverMaterializedException, SubscriptionWithCancelException @@ -19,526 +20,572 @@ import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped class FlowFlatMapPrefixSpec extends StreamSpec { def src10(i: Int = 0) = Source(i until (i + 10)) - "A PrefixAndDownstream" must { - - "work in the simple identity case" in assertAllStagesStopped { - src10() - .flatMapPrefixMat(2) { _ => - Flow[Int] - }(Keep.left) - .runWith(Sink.seq[Int]) - .futureValue should ===(2 until 10) - } - - "expose mat value in the simple identity case" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(2) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should ===(0 until 2) - suffixF.futureValue should ===(2 until 10) - } - - "work when source is exactly the required prefix" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(10) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should ===(0 until 10) - suffixF.futureValue should be(empty) - } - - "work when source has less than the required prefix" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(20) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should ===(0 until 10) - suffixF.futureValue should be(empty) - } - - "simple identity case when downstream completes before consuming the entire stream" in assertAllStagesStopped { - val (prefixF, suffixF) = Source(0 until 100) - .flatMapPrefixMat(10) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix) - }(Keep.right) - .take(10) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should ===(0 until 10) - suffixF.futureValue should ===(10 until 20) - } - - "propagate failure to create the downstream flow" in assertAllStagesStopped { - val suffixF = Source(0 until 100) - .flatMapPrefixMat(10) { prefix => - throw TE(s"I hate mondays! (${prefix.size})") - }(Keep.right) - .to(Sink.ignore) - .run() - - val ex = suffixF.failed.futureValue - ex.getCause should not be null - ex.getCause should ===(TE("I hate mondays! (10)")) - } - - "propagate flow failures" in assertAllStagesStopped { - val (prefixF, suffixF) = Source(0 until 100) - .flatMapPrefixMat(10) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix).map { - case 15 => throw TE("don't like 15 either!") - case n => n - } - }(Keep.right) - .toMat(Sink.ignore)(Keep.both) - .run() - prefixF.futureValue should ===(0 until 10) - val ex = suffixF.failed.futureValue - ex should ===(TE("don't like 15 either!")) - } - - "produce multiple elements per input" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(7) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)) - }(Keep.right) - .toMat(Sink.seq[Int])(Keep.both) - .run() - - prefixF.futureValue should ===(0 until 7) - suffixF.futureValue should ===(7 :: 8 :: 8 :: 9 :: 9 :: 9 :: Nil) - } - - "succeed when upstream produces no elements" in assertAllStagesStopped { - val (prefixF, suffixF) = Source - .empty[Int] - .flatMapPrefixMat(7) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)) - }(Keep.right) - .toMat(Sink.seq[Int])(Keep.both) - .run() - - prefixF.futureValue should be(empty) - suffixF.futureValue should be(empty) - } - - "apply materialized flow's semantics when upstream produces no elements" in assertAllStagesStopped { - val (prefixF, suffixF) = Source - .empty[Int] - .flatMapPrefixMat(7) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)).prepend(Source(100 to 101)) - }(Keep.right) - .toMat(Sink.seq[Int])(Keep.both) - .run() - - prefixF.futureValue should be(empty) - suffixF.futureValue should ===(100 :: 101 :: Nil) - } - - "handles upstream completion" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Int]() - val subscriber = TestSubscriber.manualProbe[Int]() - - val matValue = Source - .fromPublisher(publisher) - .flatMapPrefixMat(2) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix).prepend(Source(100 to 101)) - }(Keep.right) - .to(Sink.fromSubscriber(subscriber)) - .run() - - matValue.value should be(empty) - - val upstream = publisher.expectSubscription() - val downstream = subscriber.expectSubscription() - - downstream.request(1000) - - upstream.expectRequest() - //completing publisher - upstream.sendComplete() - - matValue.futureValue should ===(Nil) - - subscriber.expectNext(100) - - subscriber.expectNext(101).expectComplete() - - } - - "work when materialized flow produces no downstream elements" in assertAllStagesStopped { - val (prefixF, suffixF) = Source(0 until 100) - .flatMapPrefixMat(4) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix).filter(_ => false) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should ===(0 until 4) - suffixF.futureValue should be(empty) - } - - "work when materialized flow does not consume upstream" in assertAllStagesStopped { - val (prefixF, suffixF) = Source(0 until 100) - .map { i => - i should be <= 4 - i - } - .flatMapPrefixMat(4) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix).take(0) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should ===(0 until 4) - suffixF.futureValue should be(empty) - } - - "work when materialized flow cancels upstream but keep producing" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(4) { prefix => - Flow[Int].mapMaterializedValue(_ => prefix).take(0).concat(Source(11 to 12)) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should ===(0 until 4) - suffixF.futureValue should ===(11 :: 12 :: Nil) - } - - "propagate materialization failure (when application of 'f' succeeds)" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(4) { prefix => - Flow[Int].mapMaterializedValue(_ => throw TE(s"boom-bada-bang (${prefix.size})")) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.failed.futureValue should be(a[NeverMaterializedException]) - prefixF.failed.futureValue.getCause should ===(TE("boom-bada-bang (4)")) - suffixF.failed.futureValue should ===(TE("boom-bada-bang (4)")) - } - - "succeed when materialized flow completes downstream but keep consuming elements" in assertAllStagesStopped { - val (prefixAndTailF, suffixF) = src10() - .flatMapPrefixMat(4) { prefix => - Flow[Int] - .mapMaterializedValue(_ => prefix) - .viaMat { - Flow.fromSinkAndSourceMat(Sink.seq[Int], Source.empty[Int])(Keep.left) - }(Keep.both) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - suffixF.futureValue should be(empty) - val (prefix, suffix) = prefixAndTailF.futureValue - prefix should ===(0 until 4) - suffix.futureValue should ===(4 until 10) - } - - "propagate downstream cancellation via the materialized flow" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Int]() - val subscriber = TestSubscriber.manualProbe[Int]() - - val ((srcWatchTermF, innerMatVal), sinkMatVal) = src10() - .watchTermination()(Keep.right) - .flatMapPrefixMat(2) { prefix => - prefix should ===(0 until 2) - Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)) - }(Keep.both) - .take(1) - .toMat(Sink.seq)(Keep.both) - .run() - - val subUpstream = publisher.expectSubscription() - val subDownstream = subscriber.expectSubscription() - - // inner stream was materialized - innerMatVal.futureValue should ===(NotUsed) - - subUpstream.expectRequest() should be >= (1L) - subDownstream.request(1) - subscriber.expectNext(2) - subUpstream.sendNext(22) - subUpstream.expectCancellation() // because take(1) - // this should not automatically pass the cancellation upstream of nested flow - srcWatchTermF.isCompleted should ===(false) - sinkMatVal.futureValue should ===(Seq(22)) - - // the nested flow then decides to cancel, which moves upstream - subDownstream.cancel() - srcWatchTermF.futureValue should ===(Done) - } - - "early downstream cancellation is later handed out to materialized flow" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Int]() - val subscriber = TestSubscriber.manualProbe[Int]() - - val (srcWatchTermF, matFlowWatchTermFF) = Source - .fromPublisher(publisher) - .watchTermination()(Keep.right) - .flatMapPrefixMat(3) { prefix => - prefix should ===(0 until 3) - Flow[Int].watchTermination()(Keep.right) - }(Keep.both) - .to(Sink.fromSubscriber(subscriber)) - .run() - val matFlowWatchTerm = matFlowWatchTermFF.flatten - - matFlowWatchTerm.value should be(empty) - srcWatchTermF.value should be(empty) - - val subDownstream = subscriber.expectSubscription() - val subUpstream = publisher.expectSubscription() - subDownstream.request(1) - subUpstream.expectRequest() should be >= (1L) - subUpstream.sendNext(0) - subUpstream.sendNext(1) - subDownstream.cancel() - - //subflow not materialized yet, hence mat value (future) isn't ready yet - matFlowWatchTerm.value should be(empty) - srcWatchTermF.value should be(empty) - - //this one is sent AFTER downstream cancellation - subUpstream.sendNext(2) - - subUpstream.expectCancellation() - - matFlowWatchTerm.futureValue should ===(Done) - srcWatchTermF.futureValue should ===(Done) - - } - - "early downstream failure is deferred until prefix completion" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Int]() - val subscriber = TestSubscriber.manualProbe[Int]() - - val (srcWatchTermF, matFlowWatchTermFF) = Source - .fromPublisher(publisher) - .watchTermination()(Keep.right) - .flatMapPrefixMat(3) { prefix => - prefix should ===(0 until 3) - Flow[Int].watchTermination()(Keep.right) - }(Keep.both) - .to(Sink.fromSubscriber(subscriber)) - .run() - val matFlowWatchTerm = matFlowWatchTermFF.flatten - - matFlowWatchTerm.value should be(empty) - srcWatchTermF.value should be(empty) - - val subDownstream = subscriber.expectSubscription() - val subUpstream = publisher.expectSubscription() - subDownstream.request(1) - subUpstream.expectRequest() should be >= (1L) - subUpstream.sendNext(0) - subUpstream.sendNext(1) - subDownstream.asInstanceOf[SubscriptionWithCancelException].cancel(TE("that again?!")) - - matFlowWatchTerm.value should be(empty) - srcWatchTermF.value should be(empty) - - subUpstream.sendNext(2) - - matFlowWatchTerm.failed.futureValue should ===(TE("that again?!")) - srcWatchTermF.failed.futureValue should ===(TE("that again?!")) - - subUpstream.expectCancellation() - } - - "downstream failure is propagated via the materialized flow" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Int]() - val subscriber = TestSubscriber.manualProbe[Int]() - - val ((srcWatchTermF, notUsedF), suffixF) = src10() - .watchTermination()(Keep.right) - .flatMapPrefixMat(2) { prefix => - prefix should ===(0 until 2) - Flow.fromSinkAndSourceCoupled(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)) - }(Keep.both) - .map { - case 2 => 2 - case 3 => throw TE("3!?!?!?") - case i => fail(s"unexpected value $i") - } - .toMat(Sink.seq)(Keep.both) - .run() - - notUsedF.value should be(empty) - suffixF.value should be(empty) - srcWatchTermF.value should be(empty) - - val subUpstream = publisher.expectSubscription() - val subDownstream = subscriber.expectSubscription() - - notUsedF.futureValue should ===(NotUsed) - - subUpstream.expectRequest() should be >= (1L) - subDownstream.request(1) - subscriber.expectNext(2) - subUpstream.sendNext(2) - subDownstream.request(1) - subscriber.expectNext(3) - subUpstream.sendNext(3) - subUpstream.expectCancellation() should ===(TE("3!?!?!?")) - subscriber.expectError(TE("3!?!?!?")) - - suffixF.failed.futureValue should ===(TE("3!?!?!?")) - srcWatchTermF.failed.futureValue should ===(TE("3!?!?!?")) - } - - "complete mat value with failures on abrupt termination before materializing the flow" in assertAllStagesStopped { - val mat = Materializer(system) - val publisher = TestPublisher.manualProbe[Int]() - - val flow = Source - .fromPublisher(publisher) - .flatMapPrefixMat(2) { prefix => - fail(s"unexpected prefix (length = ${prefix.size})") - Flow[Int] - }(Keep.right) - .toMat(Sink.ignore)(Keep.both) - - val (prefixF, doneF) = flow.run()(mat) - - publisher.expectSubscription() - prefixF.value should be(empty) - doneF.value should be(empty) - - mat.shutdown() - - prefixF.failed.futureValue match { - case _: AbruptTerminationException => - case ex: NeverMaterializedException => - ex.getCause should not be null - ex.getCause should be(a[AbruptTerminationException]) + for { + att <- List( + Attributes.NestedMaterializationCancellationPolicy.EagerCancellation, + Attributes.NestedMaterializationCancellationPolicy.PropagateToNested) + delayDownstreanCancellation = att.propagateToNestedMaterialization + attributes = Attributes(att) + } { + + s"A PrefixAndDownstream with $att" must { + + "work in the simple identity case" in assertAllStagesStopped { + src10() + .flatMapPrefixMat(2) { _ => + Flow[Int] + }(Keep.left) + .withAttributes(attributes) + .runWith(Sink.seq[Int]) + .futureValue should ===(2 until 10) } - doneF.failed.futureValue should be(a[AbruptTerminationException]) - } - "respond to abrupt termination after flow materialization" in assertAllStagesStopped { - val mat = Materializer(system) - val countFF = src10() - .flatMapPrefixMat(2) { prefix => - prefix should ===(0 until 2) - Flow[Int] - .concat(Source.repeat(3)) - .fold(0L) { - case (acc, _) => acc + 1 + "expose mat value in the simple identity case" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(2) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run + + prefixF.futureValue should ===(0 until 2) + suffixF.futureValue should ===(2 until 10) + } + + "work when source is exactly the required prefix" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(10) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run + + prefixF.futureValue should ===(0 until 10) + suffixF.futureValue should be(empty) + } + + "work when source has less than the required prefix" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(20) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run + + prefixF.futureValue should ===(0 until 10) + suffixF.futureValue should be(empty) + } + + "simple identity case when downstream completes before consuming the entire stream" in assertAllStagesStopped { + val (prefixF, suffixF) = Source(0 until 100) + .flatMapPrefixMat(10) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .take(10) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run + + prefixF.futureValue should ===(0 until 10) + suffixF.futureValue should ===(10 until 20) + } + + "propagate failure to create the downstream flow" in assertAllStagesStopped { + val suffixF = Source(0 until 100) + .flatMapPrefixMat(10) { prefix => + throw TE(s"I hate mondays! (${prefix.size})") + }(Keep.right) + .to(Sink.ignore) + .withAttributes(attributes) + .run + + val ex = suffixF.failed.futureValue + ex.getCause should not be null + ex.getCause should ===(TE("I hate mondays! (10)")) + } + + "propagate flow failures" in assertAllStagesStopped { + val (prefixF, suffixF) = Source(0 until 100) + .flatMapPrefixMat(10) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).map { + case 15 => throw TE("don't like 15 either!") + case n => n } - .alsoToMat(Sink.head)(Keep.right) - }(Keep.right) - .to(Sink.ignore) - .run()(mat) - val countF = countFF.futureValue - //at this point we know the flow was materialized, now we can stop the materializer - mat.shutdown() - //expect the nested flow to be terminated abruptly. - countF.failed.futureValue should be(a[AbruptStageTerminationException]) - } - - "behave like via when n = 0" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(0) { prefix => - prefix should be(empty) - Flow[Int].mapMaterializedValue(_ => prefix) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should be(empty) - suffixF.futureValue should ===(0 until 10) - } - - "behave like via when n = 0 and upstream produces no elements" in assertAllStagesStopped { - val (prefixF, suffixF) = Source - .empty[Int] - .flatMapPrefixMat(0) { prefix => - prefix should be(empty) - Flow[Int].mapMaterializedValue(_ => prefix) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.futureValue should be(empty) - suffixF.futureValue should be(empty) - } - - "propagate errors during flow's creation when n = 0" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(0) { prefix => - prefix should be(empty) - throw TE("not this time my friend!") - Flow[Int].mapMaterializedValue(_ => prefix) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.failed.futureValue should be(a[NeverMaterializedException]) - prefixF.failed.futureValue.getCause === (TE("not this time my friend!")) - suffixF.failed.futureValue should ===(TE("not this time my friend!")) - } - - "propagate materialization failures when n = 0" in assertAllStagesStopped { - val (prefixF, suffixF) = src10() - .flatMapPrefixMat(0) { prefix => - prefix should be(empty) - Flow[Int].mapMaterializedValue(_ => throw TE("Bang! no materialization this time")) - }(Keep.right) - .toMat(Sink.seq)(Keep.both) - .run() - - prefixF.failed.futureValue should be(a[NeverMaterializedException]) - prefixF.failed.futureValue.getCause === (TE("Bang! no materialization this time")) - suffixF.failed.futureValue should ===(TE("Bang! no materialization this time")) - } - - "run a detached flow" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Int]() - val subscriber = TestSubscriber.manualProbe[String]() - - val detachedFlow = Flow.fromSinkAndSource(Sink.cancelled[Int], Source(List("a", "b", "c"))).via { - Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.empty[Int]) + }(Keep.right) + .toMat(Sink.ignore)(Keep.both) + .withAttributes(attributes) + .run + prefixF.futureValue should ===(0 until 10) + val ex = suffixF.failed.futureValue + ex should ===(TE("don't like 15 either!")) } - val fHeadOpt = Source - .fromPublisher(publisher) - .flatMapPrefix(2) { prefix => - prefix should ===(0 until 2) - detachedFlow + + "produce multiple elements per input" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(7) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)) + }(Keep.right) + .toMat(Sink.seq[Int])(Keep.both) + .withAttributes(attributes) + .run() + + prefixF.futureValue should ===(0 until 7) + suffixF.futureValue should ===(7 :: 8 :: 8 :: 9 :: 9 :: 9 :: Nil) + } + + "succeed when upstream produces no elements" in assertAllStagesStopped { + val (prefixF, suffixF) = Source + .empty[Int] + .flatMapPrefixMat(7) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)) + }(Keep.right) + .toMat(Sink.seq[Int])(Keep.both) + .withAttributes(attributes) + .run() + + prefixF.futureValue should be(empty) + suffixF.futureValue should be(empty) + } + + "apply materialized flow's semantics when upstream produces no elements" in assertAllStagesStopped { + val (prefixF, suffixF) = Source + .empty[Int] + .flatMapPrefixMat(7) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).mapConcat(n => List.fill(n - 6)(n)).prepend(Source(100 to 101)) + }(Keep.right) + .toMat(Sink.seq[Int])(Keep.both) + .withAttributes(attributes) + .run() + + prefixF.futureValue should be(empty) + suffixF.futureValue should ===(100 :: 101 :: Nil) + } + + "handles upstream completion" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val matValue = Source + .fromPublisher(publisher) + .flatMapPrefixMat(2) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).prepend(Source(100 to 101)) + }(Keep.right) + .to(Sink.fromSubscriber(subscriber)) + .withAttributes(attributes) + .run() + + matValue.value should be(empty) + + val upstream = publisher.expectSubscription() + val downstream = subscriber.expectSubscription() + + downstream.request(1000) + + upstream.expectRequest() + //completing publisher + upstream.sendComplete() + + matValue.futureValue should ===(Nil) + + subscriber.expectNext(100) + + subscriber.expectNext(101).expectComplete() + + } + + "work when materialized flow produces no downstream elements" in assertAllStagesStopped { + val (prefixF, suffixF) = Source(0 until 100) + .flatMapPrefixMat(4) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).filter(_ => false) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run + + prefixF.futureValue should ===(0 until 4) + suffixF.futureValue should be(empty) + } + + "work when materialized flow does not consume upstream" in assertAllStagesStopped { + val (prefixF, suffixF) = Source(0 until 100) + .map { i => + i should be <= 4 + i + } + .flatMapPrefixMat(4) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).take(0) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .withAttributes(attributes) + .run + + prefixF.futureValue should ===(0 until 4) + suffixF.futureValue should be(empty) + } + + "work when materialized flow cancels upstream but keep producing" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(4) { prefix => + Flow[Int].mapMaterializedValue(_ => prefix).take(0).concat(Source(11 to 12)) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run + + prefixF.futureValue should ===(0 until 4) + suffixF.futureValue should ===(11 :: 12 :: Nil) + } + + "propagate materialization failure (when application of 'f' succeeds)" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(4) { prefix => + Flow[Int].mapMaterializedValue(_ => throw TE(s"boom-bada-bang (${prefix.size})")) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run + + prefixF.failed.futureValue should be(a[NeverMaterializedException]) + prefixF.failed.futureValue.getCause should ===(TE("boom-bada-bang (4)")) + suffixF.failed.futureValue should ===(TE("boom-bada-bang (4)")) + } + + "succeed when materialized flow completes downstream but keep consuming elements" in assertAllStagesStopped { + val (prefixAndTailF, suffixF) = src10() + .flatMapPrefixMat(4) { prefix => + Flow[Int] + .mapMaterializedValue(_ => prefix) + .viaMat { + Flow.fromSinkAndSourceMat(Sink.seq[Int], Source.empty[Int])(Keep.left) + }(Keep.both) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run + + suffixF.futureValue should be(empty) + val (prefix, suffix) = prefixAndTailF.futureValue + prefix should ===(0 until 4) + suffix.futureValue should ===(4 until 10) + } + + "propagate downstream cancellation via the materialized flow" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val ((srcWatchTermF, innerMatVal), sinkMatVal) = src10() + .watchTermination()(Keep.right) + .flatMapPrefixMat(2) { prefix => + prefix should ===(0 until 2) + Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)) + }(Keep.both) + .take(1) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + val subUpstream = publisher.expectSubscription() + val subDownstream = subscriber.expectSubscription() + + // inner stream was materialized + innerMatVal.futureValue should ===(NotUsed) + + subUpstream.expectRequest() should be >= (1L) + subDownstream.request(1) + subscriber.expectNext(2) + subUpstream.sendNext(22) + subUpstream.expectCancellation() // because take(1) + // this should not automatically pass the cancellation upstream of nested flow + srcWatchTermF.isCompleted should ===(false) + sinkMatVal.futureValue should ===(Seq(22)) + + // the nested flow then decides to cancel, which moves upstream + subDownstream.cancel() + srcWatchTermF.futureValue should ===(Done) + } + + "early downstream cancellation is later handed out to materialized flow" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val (srcWatchTermF, matFlowWatchTermFF) = Source + .fromPublisher(publisher) + .watchTermination()(Keep.right) + .flatMapPrefixMat(3) { prefix => + prefix should ===(0 until 3) + Flow[Int].watchTermination()(Keep.right) + }(Keep.both) + .to(Sink.fromSubscriber(subscriber)) + .withAttributes(attributes) + .run() + val matFlowWatchTerm = matFlowWatchTermFF.flatten + + matFlowWatchTerm.value should be(empty) + srcWatchTermF.value should be(empty) + + val subDownstream = subscriber.expectSubscription() + val subUpstream = publisher.expectSubscription() + subDownstream.request(1) + subUpstream.expectRequest() should be >= (1L) + subUpstream.sendNext(0) + subUpstream.sendNext(1) + subDownstream.cancel() + + //subflow not materialized yet, hence mat value (future) isn't ready yet + matFlowWatchTerm.value should be(empty) + + if (delayDownstreanCancellation) { + srcWatchTermF.value should be(empty) + //this one is sent AFTER downstream cancellation + subUpstream.sendNext(2) + + subUpstream.expectCancellation() + + matFlowWatchTerm.futureValue should ===(Done) + srcWatchTermF.futureValue should ===(Done) + } else { + srcWatchTermF.futureValue should ===(Done) + matFlowWatchTerm.failed.futureValue should be(a[NeverMaterializedException]) } - .runWith(Sink.headOption) + } - subscriber.expectNoMessage() - val subsc = publisher.expectSubscription() - subsc.expectRequest() should be >= 2L - subsc.sendNext(0) - subscriber.expectNoMessage() - subsc.sendNext(1) - val sinkSubscription = subscriber.expectSubscription() - //this indicates - fHeadOpt.futureValue should be(empty) + "early downstream failure is deferred until prefix completion" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() - //materializef flow immediately cancels upstream - subsc.expectCancellation() - //at this point both ends of the 'external' fow are closed + val (srcWatchTermF, matFlowWatchTermFF) = Source + .fromPublisher(publisher) + .watchTermination()(Keep.right) + .flatMapPrefixMat(3) { prefix => + prefix should ===(0 until 3) + Flow[Int].watchTermination()(Keep.right) + }(Keep.both) + .to(Sink.fromSubscriber(subscriber)) + .withAttributes(attributes) + .run() + val matFlowWatchTerm = matFlowWatchTermFF.flatten - sinkSubscription.request(10) - subscriber.expectNext("a", "b", "c") - subscriber.expectComplete() + matFlowWatchTerm.value should be(empty) + srcWatchTermF.value should be(empty) + + val subDownstream = subscriber.expectSubscription() + val subUpstream = publisher.expectSubscription() + subDownstream.request(1) + subUpstream.expectRequest() should be >= (1L) + subUpstream.sendNext(0) + subUpstream.sendNext(1) + subDownstream.asInstanceOf[SubscriptionWithCancelException].cancel(TE("that again?!")) + + if (delayDownstreanCancellation) { + matFlowWatchTerm.value should be(empty) + srcWatchTermF.value should be(empty) + + subUpstream.sendNext(2) + + matFlowWatchTerm.failed.futureValue should ===(TE("that again?!")) + srcWatchTermF.failed.futureValue should ===(TE("that again?!")) + + subUpstream.expectCancellation() + } else { + subUpstream.expectCancellation() + srcWatchTermF.failed.futureValue should ===(TE("that again?!")) + matFlowWatchTerm.failed.futureValue should be(a[NeverMaterializedException]) + matFlowWatchTerm.failed.futureValue.getCause should ===(TE("that again?!")) + } + } + + "downstream failure is propagated via the materialized flow" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + val ((srcWatchTermF, notUsedF), suffixF) = src10() + .watchTermination()(Keep.right) + .flatMapPrefixMat(2) { prefix => + prefix should ===(0 until 2) + Flow.fromSinkAndSourceCoupled(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)) + }(Keep.both) + .map { + case 2 => 2 + case 3 => throw TE("3!?!?!?") + case i => fail(s"unexpected value $i") + } + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + notUsedF.value should be(empty) + suffixF.value should be(empty) + srcWatchTermF.value should be(empty) + + val subUpstream = publisher.expectSubscription() + val subDownstream = subscriber.expectSubscription() + + notUsedF.futureValue should ===(NotUsed) + + subUpstream.expectRequest() should be >= (1L) + subDownstream.request(1) + subscriber.expectNext(2) + subUpstream.sendNext(2) + subDownstream.request(1) + subscriber.expectNext(3) + subUpstream.sendNext(3) + subUpstream.expectCancellation() should ===(TE("3!?!?!?")) + subscriber.expectError(TE("3!?!?!?")) + + suffixF.failed.futureValue should ===(TE("3!?!?!?")) + srcWatchTermF.failed.futureValue should ===(TE("3!?!?!?")) + } + + "complete mat value with failures on abrupt termination before materializing the flow" in assertAllStagesStopped { + val mat = Materializer(system) + val publisher = TestPublisher.manualProbe[Int]() + + val flow = Source + .fromPublisher(publisher) + .flatMapPrefixMat(2) { prefix => + fail(s"unexpected prefix (length = ${prefix.size})") + Flow[Int] + }(Keep.right) + .toMat(Sink.ignore)(Keep.both) + .withAttributes(attributes) + + val (prefixF, doneF) = flow.run()(mat) + + publisher.expectSubscription() + prefixF.value should be(empty) + doneF.value should be(empty) + + mat.shutdown() + + prefixF.failed.futureValue match { + case _: AbruptTerminationException => + case ex: NeverMaterializedException => + ex.getCause should not be null + ex.getCause should be(a[AbruptTerminationException]) + } + doneF.failed.futureValue should be(a[AbruptTerminationException]) + } + + "respond to abrupt termination after flow materialization" in assertAllStagesStopped { + val mat = Materializer(system) + val countFF = src10() + .flatMapPrefixMat(2) { prefix => + prefix should ===(0 until 2) + Flow[Int] + .concat(Source.repeat(3)) + .fold(0L) { + case (acc, _) => acc + 1 + } + .alsoToMat(Sink.head)(Keep.right) + }(Keep.right) + .to(Sink.ignore) + .withAttributes(attributes) + .run()(mat) + val countF = countFF.futureValue + //at this point we know the flow was materialized, now we can stop the materializer + mat.shutdown() + //expect the nested flow to be terminated abruptly. + countF.failed.futureValue should be(a[AbruptStageTerminationException]) + } + + "behave like via when n = 0" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(0) { prefix => + prefix should be(empty) + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + prefixF.futureValue should be(empty) + suffixF.futureValue should ===(0 until 10) + } + + "behave like via when n = 0 and upstream produces no elements" in assertAllStagesStopped { + val (prefixF, suffixF) = Source + .empty[Int] + .flatMapPrefixMat(0) { prefix => + prefix should be(empty) + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + prefixF.futureValue should be(empty) + suffixF.futureValue should be(empty) + } + + "propagate errors during flow's creation when n = 0" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(0) { prefix => + prefix should be(empty) + throw TE("not this time my friend!") + Flow[Int].mapMaterializedValue(_ => prefix) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + prefixF.failed.futureValue should be(a[NeverMaterializedException]) + prefixF.failed.futureValue.getCause === (TE("not this time my friend!")) + suffixF.failed.futureValue should ===(TE("not this time my friend!")) + } + + "propagate materialization failures when n = 0" in assertAllStagesStopped { + val (prefixF, suffixF) = src10() + .flatMapPrefixMat(0) { prefix => + prefix should be(empty) + Flow[Int].mapMaterializedValue(_ => throw TE("Bang! no materialization this time")) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + prefixF.failed.futureValue should be(a[NeverMaterializedException]) + prefixF.failed.futureValue.getCause === (TE("Bang! no materialization this time")) + suffixF.failed.futureValue should ===(TE("Bang! no materialization this time")) + } + + "run a detached flow" in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Int]() + val subscriber = TestSubscriber.manualProbe[String]() + + val detachedFlow = Flow.fromSinkAndSource(Sink.cancelled[Int], Source(List("a", "b", "c"))).via { + Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.empty[Int]) + } + val fHeadOpt = Source + .fromPublisher(publisher) + .flatMapPrefix(2) { prefix => + prefix should ===(0 until 2) + detachedFlow + } + .withAttributes(attributes) + .runWith(Sink.headOption) + + subscriber.expectNoMessage() + val subsc = publisher.expectSubscription() + subsc.expectRequest() should be >= 2L + subsc.sendNext(0) + subscriber.expectNoMessage() + subsc.sendNext(1) + val sinkSubscription = subscriber.expectSubscription() + //this indicates + fHeadOpt.futureValue should be(empty) + + //materialize flow immediately cancels upstream + subsc.expectCancellation() + //at this point both ends of the 'external' fow are closed + + sinkSubscription.request(10) + subscriber.expectNext("a", "b", "c") + subscriber.expectComplete() + } } - } - } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFutureFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFutureFlowSpec.scala new file mode 100644 index 0000000000..f3abc71c98 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFutureFlowSpec.scala @@ -0,0 +1,529 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.SubscriptionWithCancelException.NonFailureCancellation +import akka.stream.{ AbruptStageTerminationException, Attributes, Materializer, NeverMaterializedException } +import akka.stream.testkit.StreamSpec +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped + +import scala.concurrent.{ Future, Promise } + +class FlowFutureFlowSpec extends StreamSpec { + def src10(i: Int = 0) = Source(i until (i + 10)) + def src10WithFailure(i: Int = 0)(failOn: Int) = src10(i).map { + case `failOn` => throw TE(s"fail on $failOn") + case x => x + } + + //this stage's behaviour in case of an 'early' downstream cancellation is governed by an attribute + //so we run all tests cases using both modes of the attributes. + //please notice most of the cases don't exhibit any difference in behaviour between the two modes + for { + att <- List( + Attributes.NestedMaterializationCancellationPolicy.EagerCancellation, + Attributes.NestedMaterializationCancellationPolicy.PropagateToNested) + delayDownstreanCancellation = att.propagateToNestedMaterialization + attributes = Attributes(att) + } { + + s"a futureFlow with $att" must { + "work in the simple case with a completed future" in assertAllStagesStopped { + val (fNotUsed, fSeq) = src10() + .viaMat { + Flow.futureFlow { + Future.successful(Flow[Int]) + } + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.futureValue should be(NotUsed) + fSeq.futureValue should equal(0 until 10) + } + + "work in the simple case with a late future" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, NotUsed]] + val (fNotUsed, fSeq) = src10() + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.value should be(empty) + fSeq.value should be(empty) + + prFlow.success(Flow[Int]) + + fNotUsed.futureValue should be(NotUsed) + fSeq.futureValue should equal(0 until 10) + } + + "fail properly when future is a completed failed future" in assertAllStagesStopped { + val (fNotUsed, fSeq) = src10() + .viaMat { + Flow.futureFlow { + Future.failed[Flow[Int, Int, NotUsed]](TE("damn!")) + } + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.failed.futureValue should be(a[NeverMaterializedException]) + fNotUsed.failed.futureValue.getCause should equal(TE("damn!")) + + fSeq.failed.futureValue should equal(TE("damn!")) + + } + + "fail properly when future is late completed failed future" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, NotUsed]] + val (fNotUsed, fSeq) = src10() + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.value should be(empty) + fSeq.value should be(empty) + + prFlow.failure(TE("damn!")) + + fNotUsed.failed.futureValue should be(a[NeverMaterializedException]) + fNotUsed.failed.futureValue.getCause should equal(TE("damn!")) + + fSeq.failed.futureValue should equal(TE("damn!")) + + } + + "handle upstream failure when future is pre-completed" in assertAllStagesStopped { + val (fNotUsed, fSeq) = src10WithFailure()(5) + .viaMat { + Flow.futureFlow { + Future.successful { + Flow[Int].recover { + case TE("fail on 5") => 99 + } + } + } + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.futureValue should be(NotUsed) + fSeq.futureValue should equal(List(0, 1, 2, 3, 4, 99)) + } + + "handle upstream failure when future is late-completed" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, NotUsed]] + val (fNotUsed, fSeq) = src10WithFailure()(5) + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.value should be(empty) + fSeq.value should be(empty) + + prFlow.success { + Flow[Int].recover { + case TE("fail on 5") => 99 + } + } + + fNotUsed.futureValue should be(NotUsed) + fSeq.futureValue should equal(List(0, 1, 2, 3, 4, 99)) + } + + "propagate upstream failure when future is pre-completed" in assertAllStagesStopped { + val (fNotUsed, fSeq) = src10WithFailure()(5) + .viaMat { + Flow.futureFlow { + Future.successful { + Flow[Int] + } + } + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.futureValue should be(NotUsed) + fSeq.failed.futureValue should equal(TE("fail on 5")) + } + + "propagate upstream failure when future is late-completed" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, NotUsed]] + val (fNotUsed, fSeq) = src10WithFailure()(5) + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.value should be(empty) + fSeq.value should be(empty) + + prFlow.success { + Flow[Int] + } + + fNotUsed.futureValue should be(NotUsed) + fSeq.failed.futureValue should equal(TE("fail on 5")) + } + + "handle early upstream error when flow future is pre-completed" in assertAllStagesStopped { + val (fNotUsed, fSeq) = Source + .failed(TE("not today my friend")) + .viaMat { + Flow.futureFlow { + Future.successful { + Flow[Int] + .recover { + case TE("not today my friend") => 99 + } + .concat(src10()) + } + } + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.futureValue should be(NotUsed) + fSeq.futureValue should equal(99 +: (0 until 10)) + + } + + "handle early upstream error when flow future is late-completed" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, NotUsed]] + val (fNotUsed, fSeq) = Source + .failed(TE("not today my friend")) + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.value should be(empty) + fSeq.value should be(empty) + + prFlow.success { + Flow[Int] + .recover { + case TE("not today my friend") => 99 + } + .concat(src10()) + } + + fNotUsed.futureValue should be(NotUsed) + fSeq.futureValue should equal(99 +: (0 until 10)) + + } + + "handle closed downstream when flow future is pre completed" in assertAllStagesStopped { + val (fSeq1, fSeq2) = src10() + .viaMat { + Flow.futureFlow { + Future.successful { + Flow[Int].alsoToMat(Sink.seq)(Keep.right) + } + } + }(Keep.right) + .mapMaterializedValue(_.flatten) + .take(0) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fSeq1.futureValue should be(empty) + fSeq2.futureValue should be(empty) + + } + + "handle closed downstream when flow future is late completed" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, Future[collection.immutable.Seq[Int]]]] + val (fSeq1, fSeq2) = src10() + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .mapMaterializedValue(_.flatten) + .take(0) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + if (delayDownstreanCancellation) { + fSeq1.value should be(empty) + fSeq2.value should be(empty) + + prFlow.success { + Flow[Int].alsoToMat(Sink.seq)(Keep.right) + } + + fSeq1.futureValue should be(empty) + fSeq2.futureValue should be(empty) + } else { + fSeq1.failed.futureValue should be(a[NeverMaterializedException]) + fSeq1.failed.futureValue.getCause should be(a[NonFailureCancellation]) + fSeq2.futureValue should be(empty) + } + } + + "handle early downstream failure when flow future is pre-completed" in assertAllStagesStopped { + val (fSeq1, fSeq2) = src10() + .viaMat { + Flow.futureFlow { + Future.successful { + Flow[Int].alsoToMat(Sink.seq)(Keep.right) + } + } + }(Keep.right) + .mapMaterializedValue(_.flatten) + .prepend(Source.failed(TE("damn!"))) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fSeq1.failed.futureValue should equal(TE("damn!")) + fSeq2.failed.futureValue should equal(TE("damn!")) + } + + "handle early downstream failure when flow future is late completed" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, Future[collection.immutable.Seq[Int]]]] + val (fSeq1, fSeq2) = src10() + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .mapMaterializedValue(_.flatten) + .prepend(Source.failed(TE("damn!"))) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + if (delayDownstreanCancellation) { + fSeq2.failed.futureValue should equal(TE("damn!")) + fSeq1.value should be(empty) + + prFlow.success { + Flow[Int].alsoToMat(Sink.seq)(Keep.right) + } + + fSeq1.failed.futureValue should equal(TE("damn!")) + } else { + fSeq1.failed.futureValue should be(a[NeverMaterializedException]) + fSeq1.failed.futureValue.getCause should equal(TE("damn!")) + fSeq2.failed.futureValue should equal(TE("damn!")) + } + } + + "handle early upstream completion when flow future is pre-completed" in assertAllStagesStopped { + val (fNotUsed, fSeq) = Source + .empty[Int] + .viaMat { + Flow.futureFlow { + Future.successful { + Flow[Int].orElse(Source.single(99)) + } + } + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.futureValue should be(NotUsed) + fSeq.futureValue should equal(99 :: Nil) + } + + "handle early upstream completion when flow future is late-completed" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, NotUsed]] + val (fNotUsed, fSeq) = Source + .empty[Int] + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.value should be(empty) + fSeq.value should be(empty) + + prFlow.success { + Flow[Int].orElse(Source.single(99)) + } + + fNotUsed.futureValue should be(NotUsed) + fSeq.futureValue should equal(99 :: Nil) + } + + "fails properly on materialization failure with a completed future" in assertAllStagesStopped { + val (fNotUsed, fSeq) = src10() + .viaMat { + Flow.futureFlow { + Future.successful(Flow[Int].mapMaterializedValue[NotUsed](_ => throw TE("BBOM!"))) + } + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.failed.futureValue should be(a[NeverMaterializedException]) + fNotUsed.failed.futureValue.getCause should equal(TE("BBOM!")) + fSeq.failed.futureValue should equal(TE("BBOM!")) + } + + "fails properly on materialization failure with a late future" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, NotUsed]] + val (fNotUsed, fSeq) = src10() + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.value should be(empty) + fSeq.value should be(empty) + + prFlow.success(Flow[Int].mapMaterializedValue[NotUsed](_ => throw TE("BBOM!"))) + + fNotUsed.failed.futureValue should be(a[NeverMaterializedException]) + fNotUsed.failed.futureValue.getCause should equal(TE("BBOM!")) + fSeq.failed.futureValue should equal(TE("BBOM!")) + } + + "propagate flow failures with a completed future" in assertAllStagesStopped { + val (fNotUsed, fSeq) = src10() + .viaMat { + Flow.futureFlow { + Future.successful { + Flow[Int].map { + case 5 => throw TE("fail on 5") + case x => x + } + } + } + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.futureValue should be(NotUsed) + fSeq.failed.futureValue should equal(TE("fail on 5")) + } + + "propagate flow failures with a late future" in assertAllStagesStopped { + val prFlow = Promise[Flow[Int, Int, NotUsed]] + val (fNotUsed, fSeq) = src10() + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fNotUsed.value should be(empty) + fSeq.value should be(empty) + + prFlow.success { + Flow[Int].map { + case 5 => throw TE("fail on 5") + case x => x + } + } + + fNotUsed.futureValue should be(NotUsed) + fSeq.failed.futureValue should equal(TE("fail on 5")) + } + + "allow flow to handle downstream completion with a completed future" in assertAllStagesStopped { + val (fSeq1, fSeq2) = src10() + .viaMat { + Flow.futureFlow { + Future.successful { + Flow.fromSinkAndSourceMat(Sink.seq[Int], src10(10))(Keep.left) + } + } + }(Keep.right) + .take(5) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fSeq1.flatten.futureValue should be(0 until 10) + fSeq2.futureValue should equal(10 until 15) + } + + "allow flow to handle downstream completion with a late future" in assertAllStagesStopped { + val pr = Promise[Flow[Int, Int, Future[Seq[Int]]]] + val (fSeq1, fSeq2) = src10() + .viaMat { + Flow.futureFlow(pr.future) + }(Keep.right) + .take(5) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run() + + fSeq1.value should be(empty) + fSeq2.value should be(empty) + + pr.success { + Flow.fromSinkAndSourceMat(Sink.seq[Int], src10(10))(Keep.left) + } + + fSeq1.flatten.futureValue should be(0 until 10) + fSeq2.futureValue should equal(10 until 15) + } + + "abrupt termination before future completion" in assertAllStagesStopped { + val mat = Materializer(system) + val prFlow = Promise[Flow[Int, Int, Future[collection.immutable.Seq[Int]]]] + val (fSeq1, fSeq2) = src10() + .viaMat { + Flow.futureFlow(prFlow.future) + }(Keep.right) + .take(5) + .toMat(Sink.seq)(Keep.both) + .withAttributes(attributes) + .run()(mat) + + fSeq1.value should be(empty) + fSeq2.value should be(empty) + + mat.shutdown() + + fSeq1.failed.futureValue should be(a[AbruptStageTerminationException]) + fSeq2.failed.futureValue should be(a[AbruptStageTerminationException]) + } + } + } + + "NestedMaterializationCancellationPolicy" must { + "default to false" in assertAllStagesStopped { + val fl = Flow.fromMaterializer { + case (_, attributes) => + val att = attributes.mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy] + att.propagateToNestedMaterialization should be(false) + Flow[Any] + } + Source.empty.via(fl).runWith(Sink.headOption).futureValue should be(empty) + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala index 062e2e5e44..e807f54ab1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala @@ -8,10 +8,8 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ - import com.github.ghik.silencer.silent - -import akka.NotUsed +import akka.{ Done, NotUsed } import akka.stream.AbruptStageTerminationException import akka.stream.Materializer import akka.stream.NeverMaterializedException @@ -127,7 +125,8 @@ class LazyFlowSpec extends StreamSpec(""" val deferredMatVal = result._1 val list = result._2 list.failed.futureValue shouldBe a[TE] - deferredMatVal.failed.futureValue shouldBe a[TE] + deferredMatVal.failed.futureValue shouldBe a[NeverMaterializedException] + deferredMatVal.failed.futureValue.getCause shouldBe a[TE] } "fail the flow when the future is initially failed" in assertAllStagesStopped { @@ -140,7 +139,8 @@ class LazyFlowSpec extends StreamSpec(""" val deferredMatVal = result._1 val list = result._2 list.failed.futureValue shouldBe a[TE] - deferredMatVal.failed.futureValue shouldBe a[TE] + deferredMatVal.failed.futureValue shouldBe a[NeverMaterializedException] + deferredMatVal.failed.futureValue.getCause shouldBe a[TE] } "fail the flow when the future is failed after the fact" in assertAllStagesStopped { @@ -156,7 +156,28 @@ class LazyFlowSpec extends StreamSpec(""" promise.failure(TE("later-no-flow-for-you")) list.failed.futureValue shouldBe a[TE] - deferredMatVal.failed.futureValue shouldBe a[TE] + deferredMatVal.failed.futureValue shouldBe a[NeverMaterializedException] + deferredMatVal.failed.futureValue.getCause shouldBe a[TE] + } + + "work for a single element when the future is completed after the fact" in assertAllStagesStopped { + import system.dispatcher + val flowPromise = Promise[Flow[Int, String, NotUsed]]() + val firstElementArrived = Promise[Done]() + + val result: Future[immutable.Seq[String]] = + Source(List(1)) + .via(Flow.lazyFutureFlow { () => + firstElementArrived.success(Done) + flowPromise.future + }) + .runWith(Sink.seq) + + firstElementArrived.future.map { _ => + flowPromise.success(Flow[Int].map(_.toString)) + } + + result.futureValue shouldBe List("1") } "fail the flow when the future materialization fails" in assertAllStagesStopped { @@ -170,7 +191,9 @@ class LazyFlowSpec extends StreamSpec(""" val deferredMatVal = result._1 val list = result._2 list.failed.futureValue shouldBe a[TE] - deferredMatVal.failed.futureValue shouldBe a[TE] + //futureFlow's behaviour in case of mat failure (follows flatMapPrefix) + deferredMatVal.failed.futureValue shouldBe a[NeverMaterializedException] + deferredMatVal.failed.futureValue.getCause shouldEqual TE("mat-failed") } "fail the flow when there was elements but the inner flow failed" in assertAllStagesStopped { diff --git a/akka-stream/src/main/mima-filters/2.6.5.backwards.excludes/28729-future-flow.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.5.backwards.excludes/28729-future-flow.backwards.excludes new file mode 100644 index 0000000000..3a3654c452 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.5.backwards.excludes/28729-future-flow.backwards.excludes @@ -0,0 +1,2 @@ +# Changes to internals +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.LazyFlow") diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index a06e7c0cdf..9b8aceb4c7 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -745,6 +745,7 @@ final class ActorMaterializerSettings @InternalApi private ( // for stream refs and io live with the respective stages Attributes.InputBuffer(initialInputBufferSize, maxInputBufferSize) :: Attributes.CancellationStrategy.Default :: // FIXME: make configurable, see https://github.com/akka/akka/issues/28000 + Attributes.NestedMaterializationCancellationPolicy.Default :: ActorAttributes.Dispatcher(dispatcher) :: ActorAttributes.SupervisionStrategy(supervisionDecider) :: ActorAttributes.DebugLogging(debugLogging) :: diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index f86fb8eb32..72f09ea988 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -439,6 +439,78 @@ object Attributes { strategy: CancellationStrategy.Strategy): CancellationStrategy.Strategy = CancellationStrategy.AfterDelay(delay, strategy) + /** + * Nested materialization cancellation strategy provides a way to configure the cancellation behavior of stages that materialize a nested flow. + * + * When cancelled before materializing their nested flows, these stages can either immediately cancel (default behaviour) without materializing the nested flow + * or wait for the nested flow to materialize and then propagate the cancellation signal through it. + * + * This applies to [[akka.stream.scaladsl.FlowOps.flatMapPrefix]], [[akka.stream.scaladsl.Flow.futureFlow]] (and derivations such as [[akka.stream.scaladsl.Flow.lazyFutureFlow]]). + * These operators either delay the nested flow's materialization or wait for a future to complete before doing so, + * in this period of time they may receive a downstream cancellation signal. When this happens these operators will behave according to + * this [[Attribute]]: when set to true they will 'stash' the signal and later deliver it to the materialized nested flow + * , otherwise these stages will immediately cancel without materializing the nested flow. + */ + @ApiMayChange + class NestedMaterializationCancellationPolicy private[NestedMaterializationCancellationPolicy] ( + val propagateToNestedMaterialization: Boolean) + extends MandatoryAttribute + + @ApiMayChange + object NestedMaterializationCancellationPolicy { + + /** + * A [[NestedMaterializationCancellationPolicy]] that configures graph stages + * delaying nested flow materialization to cancel immediately when downstream cancels before + * nested flow materialization. + * This applies to [[akka.stream.scaladsl.FlowOps.flatMapPrefix]], [[akka.stream.scaladsl.Flow.futureFlow]] and derived operators. + */ + val EagerCancellation = new NestedMaterializationCancellationPolicy(false) + + /** + * A [[NestedMaterializationCancellationPolicy]] that configures graph stages + * delaying nested flow materialization to delay cancellation when downstream cancels before + * nested flow materialization. Once the nested flow is materialized it will be cancelled immediately. + * This applies to [[akka.stream.scaladsl.FlowOps.flatMapPrefix]], [[akka.stream.scaladsl.Flow.futureFlow]] and derived operators. + */ + val PropagateToNested = new NestedMaterializationCancellationPolicy(true) + + /** + * Default [[NestedMaterializationCancellationPolicy]], + * please see [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.EagerCancellation()]] for details. + */ + val Default = EagerCancellation + } + + /** + * JAVA API + * A [[NestedMaterializationCancellationPolicy]] that configures graph stages + * delaying nested flow materialization to cancel immediately when downstream cancels before + * nested flow materialization. + * This applies to [[akka.stream.scaladsl.FlowOps.flatMapPrefix]], [[akka.stream.scaladsl.Flow.futureFlow]] and derived operators. + */ + @ApiMayChange + def nestedMaterializationCancellationPolicyEagerCancellation(): NestedMaterializationCancellationPolicy = + NestedMaterializationCancellationPolicy.EagerCancellation + + /** + * JAVA API + * A [[NestedMaterializationCancellationPolicy]] that configures graph stages + * delaying nested flow materialization to delay cancellation when downstream cancels before + * nested flow materialization. Once the nested flow is materialized it will be cancelled immediately. + * This applies to [[akka.stream.scaladsl.FlowOps.flatMapPrefix]], [[akka.stream.scaladsl.Flow.futureFlow]] and derived operators. + */ + @ApiMayChange + def nestedMaterializationCancellationPolicyPropagateToNested(): NestedMaterializationCancellationPolicy = + NestedMaterializationCancellationPolicy.PropagateToNested + + /** + * Default [[NestedMaterializationCancellationPolicy]], + * please see [[akka.stream.Attributes#nestedMaterializationCancellationPolicyEagerCancellation()]] for details. + */ + def nestedMaterializationCancellationPolicyDefault(): NestedMaterializationCancellationPolicy = + NestedMaterializationCancellationPolicy.Default + object LogLevels { /** Use to disable logging on certain operations when configuring [[Attributes#logLevels]] */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala index 66aff9a909..80f005093c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/FlatMapPrefix.scala @@ -27,7 +27,11 @@ import akka.util.OptionVal override def initialAttributes: Attributes = DefaultAttributes.flatMapPrefix override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M]() + val propagateToNestedMaterialization = + inheritedAttributes + .mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy] + .propagateToNestedMaterialization + val matPromise = Promise[M] val logic = new GraphStageLogic(shape) with InHandler with OutHandler { val accumulated = collection.mutable.Buffer.empty[In] @@ -90,7 +94,10 @@ import akka.util.OptionVal override def onDownstreamFinish(cause: Throwable): Unit = { subSink match { - case OptionVal.None => downstreamCause = OptionVal.Some(cause) + case OptionVal.None if propagateToNestedMaterialization => downstreamCause = OptionVal.Some(cause) + case OptionVal.None => + matPromise.failure(new NeverMaterializedException(cause)) + cancelStage(cause) case OptionVal.Some(s) => s.cancel(cause) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/FutureFlow.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/FutureFlow.scala new file mode 100644 index 0000000000..8dfd00b1b7 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/FutureFlow.scala @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.stream.impl.fusing + +import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.stream.{ AbruptStageTerminationException, Attributes, FlowShape, Inlet, NeverMaterializedException, Outlet } +import akka.stream.scaladsl.{ Flow, Keep, Source } +import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler } +import akka.util.OptionVal + +import scala.concurrent.{ Future, Promise } +import scala.util.{ Failure, Success, Try } + +@InternalApi private[akka] final class FutureFlow[In, Out, M](futureFlow: Future[Flow[In, Out, M]]) + extends GraphStageWithMaterializedValue[FlowShape[In, Out], Future[M]] { + val in = Inlet[In](s"${this}.in") + val out = Outlet[Out](s"${this}.out") + override val shape: FlowShape[In, Out] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { + val propagateToNestedMaterialization = + inheritedAttributes + .mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy] + .propagateToNestedMaterialization + val innerMatValue = Promise[M] + val logic = new GraphStageLogic(shape) { + + //seems like we must set handlers BEFORE preStart + setHandlers(in, out, Initializing) + + override def preStart(): Unit = { + futureFlow.value match { + case Some(tryFlow) => + Initializing.onFuture(tryFlow) + case None => + val cb = getAsyncCallback(Initializing.onFuture) + futureFlow.onComplete(cb.invoke)(ExecutionContexts.parasitic) + //in case both ports are closed before future completion + setKeepGoing(true) + } + } + + override def postStop(): Unit = { + if (!innerMatValue.isCompleted) { + innerMatValue.failure(new AbruptStageTerminationException(this)) + } + } + + object Initializing extends InHandler with OutHandler { + // we don't expect a push since we bever pull upstream during initialization + override def onPush(): Unit = throw new IllegalStateException("unexpected push during initialization") + + var upstreamFailure = OptionVal.none[Throwable] + + override def onUpstreamFailure(ex: Throwable): Unit = { + upstreamFailure = OptionVal.Some(ex) + } + + //will later be propagated to the materialized flow (by examining isClosed(in)) + override def onUpstreamFinish(): Unit = {} + + //will later be propagated to the materialized flow (by examining isAvailable(out)) + override def onPull(): Unit = {} + + var downstreamCause = OptionVal.none[Throwable] + + override def onDownstreamFinish(cause: Throwable): Unit = + if (propagateToNestedMaterialization) { + downstreamCause = OptionVal.Some(cause) + } else { + innerMatValue.failure(new NeverMaterializedException(cause)) + cancelStage(cause) + } + + def onFuture(futureRes: Try[Flow[In, Out, M]]) = futureRes match { + case Failure(exception) => + setKeepGoing(false) + innerMatValue.failure(new NeverMaterializedException(exception)) + failStage(exception) + case Success(flow) => + //materialize flow, connect inlet and outlet, feed with potential events and set handlers + connect(flow) + setKeepGoing(false) + } + + def connect(flow: Flow[In, Out, M]): Unit = { + val subSource = new SubSourceOutlet[In](s"${FutureFlow.this}.subIn") + val subSink = new SubSinkInlet[Out](s"${FutureFlow.this}.subOut") + + subSource.setHandler { + new OutHandler { + override def onPull(): Unit = if (!isClosed(in)) tryPull(in) + + override def onDownstreamFinish(cause: Throwable): Unit = if (!isClosed(in)) cancel(in, cause) + } + } + subSink.setHandler { + new InHandler { + override def onPush(): Unit = push(out, subSink.grab()) + + override def onUpstreamFinish(): Unit = complete(out) + + override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex) + } + } + Try { + Source.fromGraph(subSource.source).viaMat(flow)(Keep.right).to(subSink.sink).run()(subFusingMaterializer) + } match { + case Success(matVal) => + innerMatValue.success(matVal) + upstreamFailure match { + case OptionVal.Some(ex) => + subSource.fail(ex) + case OptionVal.None => + if (isClosed(in)) + subSource.complete() + } + downstreamCause match { + case OptionVal.Some(cause) => + subSink.cancel(cause) + case OptionVal.None => + if (isAvailable(out)) subSink.pull() + } + setHandlers(in, out, new InHandler with OutHandler { + override def onPull(): Unit = subSink.pull() + + override def onDownstreamFinish(cause: Throwable): Unit = subSink.cancel(cause) + + override def onPush(): Unit = subSource.push(grab(in)) + + override def onUpstreamFinish(): Unit = subSource.complete() + + override def onUpstreamFailure(ex: Throwable): Unit = subSource.fail(ex) + }) + case Failure(ex) => + innerMatValue.failure(new NeverMaterializedException(ex)) + failStage(ex) + } + } + } + } + (logic, innerMatValue.future) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index c187c9f706..7339cd67b7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder -import scala.concurrent.{ Future, Promise } +import scala.concurrent.Future import scala.concurrent.duration.{ FiniteDuration, _ } import scala.util.{ Failure, Success, Try } import scala.util.control.{ NoStackTrace, NonFatal } @@ -19,7 +19,6 @@ import com.github.ghik.silencer.silent import akka.actor.{ ActorRef, Terminated } import akka.annotation.{ DoNotInherit, InternalApi } -import akka.dispatch.ExecutionContexts import akka.event.{ LogMarker, LogSource, Logging, LoggingAdapter, MarkerLoggingAdapter } import akka.event.Logging.LogLevel import akka.stream.{ Supervision, _ } @@ -29,7 +28,7 @@ import akka.stream.OverflowStrategies._ import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl } import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.scaladsl.{ DelayStrategy, Flow, Keep, Source } +import akka.stream.scaladsl.{ DelayStrategy, Source } import akka.stream.stage._ import akka.util.OptionVal import akka.util.unused @@ -2224,199 +2223,3 @@ private[stream] object Collect { override def toString = "StatefulMapConcat" } - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class LazyFlow[I, O, M](flowFactory: I => Future[Flow[I, O, M]]) - extends GraphStageWithMaterializedValue[FlowShape[I, O], Future[M]] { - - // FIXME: when removing the deprecated I => Flow factories we can remove that complication from this stage - - val in = Inlet[I]("LazyFlow.in") - val out = Outlet[O]("LazyFlow.out") - - override def initialAttributes = DefaultAttributes.lazyFlow - - override val shape: FlowShape[I, O] = FlowShape.of(in, out) - - override def toString: String = "LazyFlow" - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M]() - val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - var switching = false - - // - // implementation of handler methods in initial state - // - private def onFlowFutureComplete(firstElement: I)(result: Try[Flow[I, O, M]]) = result match { - case Success(flow) => - // check if the stage is still in need for the lazy flow - // (there could have been an onUpstreamFailure or onDownstreamFinish in the meantime that has completed the promise) - if (!matPromise.isCompleted) { - try { - val mat = switchTo(flow, firstElement) - matPromise.success(mat) - } catch { - case NonFatal(e) => - matPromise.failure(e) - failStage(e) - } - } - case Failure(e) => - matPromise.failure(e) - failStage(e) - } - - override def onPush(): Unit = - try { - val element = grab(in) - switching = true - val futureFlow = flowFactory(element) - - // optimization avoid extra scheduling if already completed - futureFlow.value match { - case Some(completed) => - onFlowFutureComplete(element)(completed) - case None => - val cb = getAsyncCallback[Try[Flow[I, O, M]]](onFlowFutureComplete(element)) - futureFlow.onComplete(cb.invoke)(ExecutionContexts.parasitic) - } - } catch { - case NonFatal(e) => - matPromise.failure(e) - failStage(e) - } - - override def onUpstreamFinish(): Unit = { - if (!matPromise.isCompleted) - matPromise.tryFailure(new NeverMaterializedException) - // ignore onUpstreamFinish while the stage is switching but setKeepGoing - if (switching) { - setKeepGoing(true) - } else { - super.onUpstreamFinish() - } - } - - override def onUpstreamFailure(ex: Throwable): Unit = { - super.onUpstreamFailure(ex) - if (!matPromise.isCompleted) - matPromise.tryFailure(new NeverMaterializedException(ex)) - } - - override def onPull(): Unit = { - pull(in) - } - - override def postStop(): Unit = { - if (!matPromise.isCompleted) - matPromise.tryFailure(new AbruptStageTerminationException(this)) - } - - setHandler(in, this) - setHandler(out, this) - - private def switchTo(flow: Flow[I, O, M], firstElement: I): M = { - - // - // ports are wired in the following way: - // - // in ~> subOutlet ~> lazyFlow ~> subInlet ~> out - // - - val subInlet = new SubSinkInlet[O]("LazyFlowSubSink") - val subOutlet = new SubSourceOutlet[I]("LazyFlowSubSource") - - val matVal = Source - .fromGraph(subOutlet.source) - .prepend(Source.single(firstElement)) - .viaMat(flow)(Keep.right) - .toMat(subInlet.sink)(Keep.left) - .run()(interpreter.subFusingMaterializer) - - // The lazily materialized flow may be constructed from a sink and a source. Therefore termination - // signals (completion, cancellation, and errors) are not guaranteed to pass through the flow. This - // means that this stage must not be completed as soon as one side of the flow is finished. - // - // Invariant: isClosed(out) == subInlet.isClosed after each event because termination signals (i.e. - // completion, cancellation, and failure) between these two ports are always forwarded. - // - // However, isClosed(in) and subOutlet.isClosed may be different. This happens if upstream completes before - // the cached element was pushed. - def maybeCompleteStage(): Unit = { - if (isClosed(in) && subOutlet.isClosed && isClosed(out)) { - completeStage() - } - } - - // The stage must not be shut down automatically; it is completed when maybeCompleteStage decides - setKeepGoing(true) - - setHandler( - in, - new InHandler { - override def onPush(): Unit = { - subOutlet.push(grab(in)) - } - override def onUpstreamFinish(): Unit = { - subOutlet.complete() - maybeCompleteStage() - } - override def onUpstreamFailure(ex: Throwable): Unit = { - // propagate exception irrespective if the cached element has been pushed or not - subOutlet.fail(ex) - maybeCompleteStage() - } - }) - - setHandler(out, new OutHandler { - override def onPull(): Unit = { - subInlet.pull() - } - override def onDownstreamFinish(cause: Throwable): Unit = { - subInlet.cancel(cause) - maybeCompleteStage() - } - }) - - subOutlet.setHandler(new OutHandler { - override def onPull(): Unit = { - pull(in) - } - override def onDownstreamFinish(cause: Throwable): Unit = { - if (!isClosed(in)) { - cancel(in, cause) - } - maybeCompleteStage() - } - }) - - subInlet.setHandler(new InHandler { - override def onPush(): Unit = { - push(out, subInlet.grab()) - } - override def onUpstreamFinish(): Unit = { - complete(out) - maybeCompleteStage() - } - override def onUpstreamFailure(ex: Throwable): Unit = { - fail(out, ex) - maybeCompleteStage() - } - }) - - if (isClosed(out)) { - // downstream may have been canceled while the stage was switching - subInlet.cancel() - } else { - subInlet.pull() - } - - matVal - } - } - (stageLogic, matPromise.future) - } -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f29779b5c7..d4c02a9749 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -6,7 +6,6 @@ package akka.stream.javadsl import java.util.Comparator import java.util.Optional -import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage import java.util.function.BiFunction import java.util.function.Supplier @@ -30,7 +29,6 @@ import akka.japi.Util import akka.japi.function import akka.japi.function.Creator import akka.stream._ -import akka.stream.impl.fusing.LazyFlow import akka.util.ConstantFun import akka.util.JavaDurationConverters._ import akka.util.Timeout @@ -263,9 +261,9 @@ object Flow { flowFactory: function.Function[I, CompletionStage[Flow[I, O, M]]], fallback: function.Creator[M]): Flow[I, O, M] = { import scala.compat.java8.FutureConverters._ - val sflow = scaladsl.Flow - .fromGraph(new LazyFlow[I, O, M](t => flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.parasitic))) - .mapMaterializedValue(_ => fallback.create()) + val sflow = scaladsl.Flow.lazyInit( + (flowFactory.apply(_)).andThen(_.toScala.map(_.asScala)(ExecutionContexts.parasitic)), + fallback.create _) new Flow(sflow) } @@ -304,8 +302,12 @@ object Flow { * The materialized completion stage value is completed with the materialized value of the future flow or failed with a * [[NeverMaterializedException]] if upstream fails or downstream cancels before the completion stage has completed. */ - def completionStageFlow[I, O, M](flow: CompletionStage[Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = - lazyCompletionStageFlow(() => flow) + def completionStageFlow[I, O, M](flow: CompletionStage[Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = { + import scala.compat.java8.FutureConverters._ + val sflow = + scaladsl.Flow.futureFlow(flow.toScala.map(_.asScala)(ExecutionContexts.parasitic)).mapMaterializedValue(_.toJava) + new javadsl.Flow(sflow) + } /** * Defers invoking the `create` function to create a future flow until there is downstream demand and passing @@ -322,8 +324,15 @@ object Flow { * * '''Cancels when''' downstream cancels */ - def lazyFlow[I, O, M](create: Creator[Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = - lazyCompletionStageFlow(() => CompletableFuture.completedFuture(create.create())) + def lazyFlow[I, O, M](create: Creator[Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = { + import scala.compat.java8.FutureConverters._ + val sflow = scaladsl.Flow + .lazyFlow { () => + create.create().asScala + } + .mapMaterializedValue(_.toJava) + new javadsl.Flow(sflow) + } /** * Defers invoking the `create` function to create a future flow until there downstream demand has caused upstream diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 6f0db2bc75..9ffefd69d2 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -578,13 +578,22 @@ object Flow { * * '''Completes when''' upstream completes and all elements have been emitted from the internal flow * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream cancels (see below) + * + * The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + * This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + * this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). */ @deprecated( "Use 'Flow.futureFlow' in combination with prefixAndTail(1) instead, see `futureFlow` operator docs for details", "2.6.0") def lazyInit[I, O, M](flowFactory: I => Future[Flow[I, O, M]], fallback: () => M): Flow[I, O, M] = - Flow.fromGraph(new LazyFlow[I, O, M](flowFactory)).mapMaterializedValue(_ => fallback()) + Flow[I] + .flatMapPrefix(1) { + case Seq(a) => futureFlow(flowFactory(a)).mapMaterializedValue(_ => NotUsed) + case Nil => Flow[I].asInstanceOf[Flow[I, O, NotUsed]] + } + .mapMaterializedValue(_ => fallback()) /** * Creates a real `Flow` upon receiving the first element. Internal `Flow` will not be created @@ -600,13 +609,17 @@ object Flow { * * '''Completes when''' upstream completes and all elements have been emitted from the internal flow * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream cancels (see below) + * + * The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + * This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + * this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). */ @deprecated("Use 'Flow.lazyFutureFlow' instead", "2.6.0") def lazyInitAsync[I, O, M](flowFactory: () => Future[Flow[I, O, M]]): Flow[I, O, Future[Option[M]]] = - Flow.fromGraph(new LazyFlow[I, O, M](_ => flowFactory())).mapMaterializedValue { v => + Flow.lazyFutureFlow(flowFactory).mapMaterializedValue { implicit val ec = akka.dispatch.ExecutionContexts.parasitic - v.map[Option[M]](Some.apply _).recover { case _: NeverMaterializedException => None } + _.map(Some.apply).recover { case _: NeverMaterializedException => None } } /** @@ -615,9 +628,13 @@ object Flow { * * The materialized future value is completed with the materialized value of the future flow or failed with a * [[NeverMaterializedException]] if upstream fails or downstream cancels before the future has completed. + * + * The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + * This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + * this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). */ def futureFlow[I, O, M](flow: Future[Flow[I, O, M]]): Flow[I, O, Future[M]] = - lazyFutureFlow(() => flow) + Flow.fromGraph(new FutureFlow(flow)) /** * Defers invoking the `create` function to create a future flow until there is downstream demand and passing @@ -638,7 +655,11 @@ object Flow { * * '''Completes when''' upstream completes and all elements have been emitted from the internal flow * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream cancels (see below) + * + * The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + * This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + * this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). */ def lazyFlow[I, O, M](create: () => Flow[I, O, M]): Flow[I, O, Future[M]] = lazyFutureFlow(() => Future.successful(create())) @@ -662,10 +683,27 @@ object Flow { * * '''Completes when''' upstream completes and all elements have been emitted from the internal flow * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream cancels (see below) + * + * The operator's default behaviour in case of downstream cancellation before nested flow materialization (future completion) is to cancel immediately. + * This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, + * this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). */ def lazyFutureFlow[I, O, M](create: () => Future[Flow[I, O, M]]): Flow[I, O, Future[M]] = - Flow.fromGraph(new LazyFlow(_ => create())) + Flow[I] + .flatMapPrefixMat(1) { + case Seq(a) => + val f: Flow[I, O, Future[M]] = + futureFlow(create() + .map(Flow[I].prepend(Source.single(a)).viaMat(_)(Keep.right))(akka.dispatch.ExecutionContexts.parasitic)) + f + case Nil => + val f: Flow[I, O, Future[M]] = Flow[I] + .asInstanceOf[Flow[I, O, NotUsed]] + .mapMaterializedValue(_ => Future.failed[M](new NeverMaterializedException())) + f + }(Keep.right) + .mapMaterializedValue(_.flatten) } @@ -1945,7 +1983,9 @@ trait FlowOps[+Out, +Mat] { * the resulting flow will be materialized and signalled for upstream completion, it can then complete or continue to emit elements at its own discretion. * * '''Cancels when''' the materialized flow cancels. - * Notice that when downstream cancels prior to prefix completion, the cancellation cause is stashed until prefix completion (or upstream completion) and then handed to the materialized flow. + * When downstream cancels before materialization of the nested flow, the operator's default behaviour is to cancel immediately, + * this behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy]] attribute on the flow. + * When this attribute is configured to true, downstream cancellation is delayed until the nested flow's materialization which is then immediately cancelled (with the original cancellation cause). * * @param n the number of elements to accumulate before materializing the downstream flow. * @param f a function that produces the downstream flow based on the upstream's prefix.