diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md index 175e784f2e..7f302009b7 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md @@ -18,6 +18,7 @@ Pass incoming elements to a function that return a @scala[`Future`] @java[`Compl downstream. Up to `n` elements can be processed concurrently, but regardless of their completion time the incoming order will be kept when results complete. For use cases where order does not matter `mapAsyncUnordered` can be used. +If a @scala[`Future`] @java[`CompletionStage`] completes with `null`, element is not passed downstream. If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied) diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncUnordered.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncUnordered.md index eb0fc39532..3bbe23917e 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncUnordered.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncUnordered.md @@ -17,6 +17,7 @@ Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements that triggered them. +If a @scala[`Future`] @java[`CompletionStage`] completes with `null`, element is not passed downstream. If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied) diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromCompletionStage.md b/akka-docs/src/main/paradox/stream/operators/Source/fromCompletionStage.md index 7310b7d7fa..29d2fceefb 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/fromCompletionStage.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/fromCompletionStage.md @@ -15,7 +15,8 @@ Send the single value of the `CompletionStage` when it completes and there is de ## Description Send the single value of the `CompletionStage` when it completes and there is demand. -If the future fails the stream is failed with that exception. +If the `CompletionStage` completes with `null` stage is completed without emitting a value. +If the `CompletionStage` fails the stream is failed with that exception. @@@div { .callout } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 78bda44fe5..62aa01477d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -813,6 +813,21 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("C"); } + @Test + public void mustBeAbleToUseMapAsyncForFutureWithNullResult() throws Exception { + final Iterable input = Arrays.asList(1, 2, 3); + Flow flow = + Flow.of(Integer.class).mapAsync(1, x -> CompletableFuture.completedFuture(null)); + List result = + Source.from(input) + .via(flow) + .runWith(Sink.seq(), materializer) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals(0, result.size()); + } + @Test public void mustBeAbleToUseCollectType() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 485be034bb..ff8ce69619 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -585,6 +585,15 @@ public class SourceTest extends StreamTest { assertEquals("A", result); } + @Test + public void mustWorkFromFutureVoid() throws Exception { + CompletionStage future = CompletableFuture.completedFuture(null); + CompletionStage> future2 = + Source.fromCompletionStage(future).runWith(Sink.seq(), materializer); + List result = future2.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(0, result.size()); + } + @Test public void mustWorkFromRange() throws Exception { CompletionStage> f = diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 868c62ada4..29e745ec2f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -331,25 +331,54 @@ class FlowMapAsyncSpec extends StreamSpec { c.expectComplete() } - "signal NPE when future is completed with null" in { - val c = TestSubscriber.manualProbe[String]() - val p = Source(List("a", "b")).mapAsync(4)(elem => Future.successful(null)).to(Sink.fromSubscriber(c)).run() - val sub = c.expectSubscription() - sub.request(10) - c.expectError().getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) + "ignore element when future is completed with null" in { + val flow = Flow[Int].mapAsync[String](2) { + case 2 => Future.successful(null) + case x => Future.successful(x.toString) + } + val result = Source(List(1, 2, 3)).via(flow).runWith(Sink.seq) + result.futureValue should ===(Seq("1", "3")) } - "resume when future is completed with null" in { - val c = TestSubscriber.manualProbe[String]() - val p = Source(List("a", "b", "c")) - .mapAsync(4)(elem => if (elem == "b") Future.successful(null) else Future.successful(elem)) - .withAttributes(supervisionStrategy(resumingDecider)) - .to(Sink.fromSubscriber(c)) - .run() - val sub = c.expectSubscription() - sub.request(10) - for (elem <- List("a", "c")) c.expectNext(elem) - c.expectComplete() + "continue emitting after a sequence of nulls" in { + val flow = Flow[Int].mapAsync[String](3) { value => + if (value == 0 || value >= 100) Future.successful(value.toString) + else Future.successful(null) + } + + val result = Source(0 to 102).via(flow).runWith(Sink.seq) + + result.futureValue should ===(Seq("0", "100", "101", "102")) + } + + "complete without emitting any element after a sequence of nulls only" in { + val flow = Flow[Int].mapAsync[String](3) { _ => + Future.successful(null) + } + + val result = Source(0 to 200).via(flow).runWith(Sink.seq) + + result.futureValue shouldBe empty + } + + "complete stage if future with null result is completed last" in { + import system.dispatcher + val latch = TestLatch(2) + + val flow = Flow[Int].mapAsync[String](2) { + case 2 => + Future { + Await.ready(latch, 10 seconds) + null + } + case x => + latch.countDown() + Future.successful(x.toString) + } + + val result = Source(List(1, 2, 3)).via(flow).runWith(Sink.seq) + + result.futureValue should ===(Seq("1", "3")) } "should handle cancel properly" in assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index 6730481e6c..390622eb6c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -213,26 +213,55 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec { .expectComplete() } - "signal NPE when future is completed with null" in { - val c = TestSubscriber.manualProbe[String]() - val p = - Source(List("a", "b")).mapAsyncUnordered(4)(elem => Future.successful(null)).to(Sink.fromSubscriber(c)).run() - val sub = c.expectSubscription() - sub.request(10) - c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) + "ignore element when future is completed with null" in { + val flow = Flow[Int].mapAsyncUnordered[String](2) { + case 2 => Future.successful(null) + case x => Future.successful(x.toString) + } + val result = Source(List(1, 2, 3)).via(flow).runWith(Sink.seq) + + result.futureValue should contain only ("1", "3") } - "resume when future is completed with null" in { - val c = TestSubscriber.manualProbe[String]() - val p = Source(List("a", "b", "c")) - .mapAsyncUnordered(4)(elem => if (elem == "b") Future.successful(null) else Future.successful(elem)) - .withAttributes(supervisionStrategy(resumingDecider)) - .to(Sink.fromSubscriber(c)) - .run() - val sub = c.expectSubscription() - sub.request(10) - c.expectNextUnordered("a", "c") - c.expectComplete() + "continue emitting after a sequence of nulls" in { + val flow = Flow[Int].mapAsyncUnordered[String](3) { value => + if (value == 0 || value >= 100) Future.successful(value.toString) + else Future.successful(null) + } + + val result = Source(0 to 102).via(flow).runWith(Sink.seq) + + result.futureValue should contain only ("0", "100", "101", "102") + } + + "complete without emitting any element after a sequence of nulls only" in { + val flow = Flow[Int].mapAsyncUnordered[String](3) { _ => + Future.successful(null) + } + + val result = Source(0 to 200).via(flow).runWith(Sink.seq) + + result.futureValue shouldBe empty + } + + "complete stage if future with null result is completed last" in { + import system.dispatcher + val latch = TestLatch(2) + + val flow = Flow[Int].mapAsyncUnordered[String](2) { + case 2 => + Future { + Await.ready(latch, 10 seconds) + null + } + case x => + latch.countDown() + Future.successful(x.toString) + } + + val result = Source(List(1, 2, 3)).via(flow).runWith(Sink.seq) + + result.futureValue should contain only ("1", "3") } "handle cancel properly" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 23f61a7ea8..d6f59772c8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -379,8 +379,9 @@ import scala.concurrent.{ Future, Promise } def onFutureCompleted(result: Try[T]): Unit = { result match { - case scala.util.Success(v) => emit(out, v, () => completeStage()) - case scala.util.Failure(t) => failStage(t) + case scala.util.Success(null) => completeStage() + case scala.util.Success(v) => emit(out, v, () => completeStage()) + case scala.util.Failure(t) => failStage(t) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d1c0c3cbda..f63fc58087 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1208,10 +1208,7 @@ private[stream] object Collect { } def setElem(t: Try[T]): Unit = { - elem = t match { - case Success(null) => Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException) - case other => other - } + elem = t } override def apply(t: Try[T]): Unit = { @@ -1297,10 +1294,14 @@ private[stream] object Collect { else if (isAvailable(out)) { val holder = buffer.dequeue() holder.elem match { - case Success(elem) => + case Success(elem) if elem != null => push(out, elem) pullIfNeeded() + case Success(null) => + pullIfNeeded() + pushNextIfPossible() + case Failure(NonFatal(ex)) => holder.supervisionDirectiveFor(decider, ex) match { // this could happen if we are looping in pushNextIfPossible and end up on a failed future before the @@ -1356,11 +1357,10 @@ private[stream] object Collect { if (!hasBeenPulled(in)) tryPull(in) push(out, elem) } else buffer.enqueue(elem) - case other => - val ex = other match { - case Failure(t) => t - case Success(s) if s == null => ReactiveStreamsCompliance.elementMustNotBeNullException - } + case Success(null) => + if (isClosed(in) && todo == 0) completeStage() + else if (!hasBeenPulled(in)) tryPull(in) + case Failure(ex) => if (decider(ex) == Supervision.Stop) failStage(ex) else if (isClosed(in) && todo == 0) completeStage() else if (!hasBeenPulled(in)) tryPull(in)