Support for null in mapAsync and fromCompletionStage #25475

This commit is contained in:
mszczygiel 2019-04-16 09:08:05 +02:00 committed by Johan Andrén
parent 549ccb78a6
commit d4813b91c3
9 changed files with 134 additions and 48 deletions

View file

@ -18,6 +18,7 @@ Pass incoming elements to a function that return a @scala[`Future`] @java[`Compl
downstream. Up to `n` elements can be processed concurrently, but regardless of their completion time the incoming
order will be kept when results complete. For use cases where order does not matter `mapAsyncUnordered` can be used.
If a @scala[`Future`] @java[`CompletionStage`] completes with `null`, element is not passed downstream.
If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied)

View file

@ -17,6 +17,7 @@ Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed
Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements
that triggered them.
If a @scala[`Future`] @java[`CompletionStage`] completes with `null`, element is not passed downstream.
If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied)

View file

@ -15,7 +15,8 @@ Send the single value of the `CompletionStage` when it completes and there is de
## Description
Send the single value of the `CompletionStage` when it completes and there is demand.
If the future fails the stream is failed with that exception.
If the `CompletionStage` completes with `null` stage is completed without emitting a value.
If the `CompletionStage` fails the stream is failed with that exception.
@@@div { .callout }

View file

@ -813,6 +813,21 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("C");
}
@Test
public void mustBeAbleToUseMapAsyncForFutureWithNullResult() throws Exception {
final Iterable<Integer> input = Arrays.asList(1, 2, 3);
Flow<Integer, Void, NotUsed> flow =
Flow.of(Integer.class).mapAsync(1, x -> CompletableFuture.completedFuture(null));
List<Void> result =
Source.from(input)
.via(flow)
.runWith(Sink.seq(), materializer)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
assertEquals(0, result.size());
}
@Test
public void mustBeAbleToUseCollectType() throws Exception {
final TestKit probe = new TestKit(system);

View file

@ -585,6 +585,15 @@ public class SourceTest extends StreamTest {
assertEquals("A", result);
}
@Test
public void mustWorkFromFutureVoid() throws Exception {
CompletionStage<Void> future = CompletableFuture.completedFuture(null);
CompletionStage<List<Void>> future2 =
Source.fromCompletionStage(future).runWith(Sink.seq(), materializer);
List<Void> result = future2.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(0, result.size());
}
@Test
public void mustWorkFromRange() throws Exception {
CompletionStage<List<Integer>> f =

View file

@ -331,25 +331,54 @@ class FlowMapAsyncSpec extends StreamSpec {
c.expectComplete()
}
"signal NPE when future is completed with null" in {
val c = TestSubscriber.manualProbe[String]()
val p = Source(List("a", "b")).mapAsync(4)(elem => Future.successful(null)).to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError().getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
"ignore element when future is completed with null" in {
val flow = Flow[Int].mapAsync[String](2) {
case 2 => Future.successful(null)
case x => Future.successful(x.toString)
}
val result = Source(List(1, 2, 3)).via(flow).runWith(Sink.seq)
result.futureValue should ===(Seq("1", "3"))
}
"resume when future is completed with null" in {
val c = TestSubscriber.manualProbe[String]()
val p = Source(List("a", "b", "c"))
.mapAsync(4)(elem => if (elem == "b") Future.successful(null) else Future.successful(elem))
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink.fromSubscriber(c))
.run()
val sub = c.expectSubscription()
sub.request(10)
for (elem <- List("a", "c")) c.expectNext(elem)
c.expectComplete()
"continue emitting after a sequence of nulls" in {
val flow = Flow[Int].mapAsync[String](3) { value =>
if (value == 0 || value >= 100) Future.successful(value.toString)
else Future.successful(null)
}
val result = Source(0 to 102).via(flow).runWith(Sink.seq)
result.futureValue should ===(Seq("0", "100", "101", "102"))
}
"complete without emitting any element after a sequence of nulls only" in {
val flow = Flow[Int].mapAsync[String](3) { _ =>
Future.successful(null)
}
val result = Source(0 to 200).via(flow).runWith(Sink.seq)
result.futureValue shouldBe empty
}
"complete stage if future with null result is completed last" in {
import system.dispatcher
val latch = TestLatch(2)
val flow = Flow[Int].mapAsync[String](2) {
case 2 =>
Future {
Await.ready(latch, 10 seconds)
null
}
case x =>
latch.countDown()
Future.successful(x.toString)
}
val result = Source(List(1, 2, 3)).via(flow).runWith(Sink.seq)
result.futureValue should ===(Seq("1", "3"))
}
"should handle cancel properly" in assertAllStagesStopped {

View file

@ -213,26 +213,55 @@ class FlowMapAsyncUnorderedSpec extends StreamSpec {
.expectComplete()
}
"signal NPE when future is completed with null" in {
val c = TestSubscriber.manualProbe[String]()
val p =
Source(List("a", "b")).mapAsyncUnordered(4)(elem => Future.successful(null)).to(Sink.fromSubscriber(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
"ignore element when future is completed with null" in {
val flow = Flow[Int].mapAsyncUnordered[String](2) {
case 2 => Future.successful(null)
case x => Future.successful(x.toString)
}
val result = Source(List(1, 2, 3)).via(flow).runWith(Sink.seq)
result.futureValue should contain only ("1", "3")
}
"resume when future is completed with null" in {
val c = TestSubscriber.manualProbe[String]()
val p = Source(List("a", "b", "c"))
.mapAsyncUnordered(4)(elem => if (elem == "b") Future.successful(null) else Future.successful(elem))
.withAttributes(supervisionStrategy(resumingDecider))
.to(Sink.fromSubscriber(c))
.run()
val sub = c.expectSubscription()
sub.request(10)
c.expectNextUnordered("a", "c")
c.expectComplete()
"continue emitting after a sequence of nulls" in {
val flow = Flow[Int].mapAsyncUnordered[String](3) { value =>
if (value == 0 || value >= 100) Future.successful(value.toString)
else Future.successful(null)
}
val result = Source(0 to 102).via(flow).runWith(Sink.seq)
result.futureValue should contain only ("0", "100", "101", "102")
}
"complete without emitting any element after a sequence of nulls only" in {
val flow = Flow[Int].mapAsyncUnordered[String](3) { _ =>
Future.successful(null)
}
val result = Source(0 to 200).via(flow).runWith(Sink.seq)
result.futureValue shouldBe empty
}
"complete stage if future with null result is completed last" in {
import system.dispatcher
val latch = TestLatch(2)
val flow = Flow[Int].mapAsyncUnordered[String](2) {
case 2 =>
Future {
Await.ready(latch, 10 seconds)
null
}
case x =>
latch.countDown()
Future.successful(x.toString)
}
val result = Source(List(1, 2, 3)).via(flow).runWith(Sink.seq)
result.futureValue should contain only ("1", "3")
}
"handle cancel properly" in assertAllStagesStopped {

View file

@ -379,8 +379,9 @@ import scala.concurrent.{ Future, Promise }
def onFutureCompleted(result: Try[T]): Unit = {
result match {
case scala.util.Success(v) => emit(out, v, () => completeStage())
case scala.util.Failure(t) => failStage(t)
case scala.util.Success(null) => completeStage()
case scala.util.Success(v) => emit(out, v, () => completeStage())
case scala.util.Failure(t) => failStage(t)
}
}

View file

@ -1208,10 +1208,7 @@ private[stream] object Collect {
}
def setElem(t: Try[T]): Unit = {
elem = t match {
case Success(null) => Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException)
case other => other
}
elem = t
}
override def apply(t: Try[T]): Unit = {
@ -1297,10 +1294,14 @@ private[stream] object Collect {
else if (isAvailable(out)) {
val holder = buffer.dequeue()
holder.elem match {
case Success(elem) =>
case Success(elem) if elem != null =>
push(out, elem)
pullIfNeeded()
case Success(null) =>
pullIfNeeded()
pushNextIfPossible()
case Failure(NonFatal(ex)) =>
holder.supervisionDirectiveFor(decider, ex) match {
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
@ -1356,11 +1357,10 @@ private[stream] object Collect {
if (!hasBeenPulled(in)) tryPull(in)
push(out, elem)
} else buffer.enqueue(elem)
case other =>
val ex = other match {
case Failure(t) => t
case Success(s) if s == null => ReactiveStreamsCompliance.elementMustNotBeNullException
}
case Success(null) =>
if (isClosed(in) && todo == 0) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)
case Failure(ex) =>
if (decider(ex) == Supervision.Stop) failStage(ex)
else if (isClosed(in) && todo == 0) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)