From 4949b12c5a555c3c2d5ce9edfb6742c7d4cb4672 Mon Sep 17 00:00:00 2001 From: kerr Date: Wed, 20 Apr 2022 19:06:16 +0800 Subject: [PATCH] +str Add Flow#flattenOptional (#31285) --- .../stream/operators/Flow/flattenOptional.md | 29 ++++++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../java/akka/stream/javadsl/SourceTest.java | 40 +++++++++++++++++-- .../main/scala/akka/stream/javadsl/Flow.scala | 17 ++++++++ 4 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Flow/flattenOptional.md diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/flattenOptional.md b/akka-docs/src/main/paradox/stream/operators/Flow/flattenOptional.md new file mode 100644 index 0000000000..a559189329 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Flow/flattenOptional.md @@ -0,0 +1,29 @@ +# Flow.flattenOptional + +Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Flow.flattenOptional](Flow$) { java="#flattenOptional(akka.stream.javadsl.Flow)" } + + +## Description + +Streams the elements through the given future flow once it successfully completes. +If the future fails the stream is failed. + +## Reactive Streams semantics + +@@@div { .callout } + +**Emits when** the current @javadoc[Optional](java.util.Optional)'s value is present. + +**Backpressures when** the value of the current @javadoc[Optional](java.util.Optional) is present and downstream backpressures. + +**Completes when** upstream completes. + +**Cancels when** downstream cancels. +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index d7a4e8b970..dd3daafda5 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -150,6 +150,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| |Source/Flow|@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.| |Source/Flow|@ref[filterNot](Source-or-Flow/filterNot.md)|Filter the incoming elements using a predicate.| +|Flow|@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.| |Source/Flow|@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.| |Source/Flow|@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| |Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| @@ -423,6 +424,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [flatMapConcat](Source-or-Flow/flatMapConcat.md) * [flatMapMerge](Source-or-Flow/flatMapMerge.md) * [flatMapPrefix](Source-or-Flow/flatMapPrefix.md) +* [flattenOptional](Flow/flattenOptional.md) * [fold](Source-or-Flow/fold.md) * [fold](Sink/fold.md) * [foldAsync](Source-or-Flow/foldAsync.md) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 56c820e32b..9910d60521 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -1179,16 +1179,21 @@ public class SourceTest extends StreamTest { } @Test - public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletion() { + public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletion() throws Exception { final Iterator iterator = IntStream.range(1, 10).iterator(); final Creator> input = () -> iterator; final Done completion = - Source.fromIterator(input).map(it -> it * 10).run(system).toCompletableFuture().join(); + Source.fromIterator(input) + .map(it -> it * 10) + .run(system) + .toCompletableFuture() + .get(1, TimeUnit.SECONDS); assertEquals(Done.getInstance(), completion); } @Test - public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletionWithMaterializer() { + public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletionWithMaterializer() + throws Exception { final Materializer materializer = Materializer.createMaterializer(system); final Iterator iterator = IntStream.range(1, 10).iterator(); final Creator> input = () -> iterator; @@ -1197,7 +1202,7 @@ public class SourceTest extends StreamTest { .map(it -> it * 10) .run(materializer) .toCompletableFuture() - .join(); + .get(3, TimeUnit.SECONDS); assertEquals(Done.getInstance(), completion); } @@ -1235,4 +1240,31 @@ public class SourceTest extends StreamTest { 1346269, 2178309, 3524578, 5702887, 9227465), resultList); } + + @Test + public void flattenOptional() throws Exception { + // #flattenOptional + final CompletionStage> resultList = + Source.range(1, 10) + .map(x -> Optional.of(x).filter(n -> n % 2 == 0)) + .via(Flow.flattenOptional()) + .runWith(Sink.seq(), system); + // #flattenOptional + Assert.assertEquals( + Arrays.asList(2, 4, 6, 8, 10), resultList.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void flattenOptionalOptional() throws Exception { + final List resultList = + Source.range(1, 10) + .map(x -> Optional.of(x).filter(n -> n % 2 == 0)) + .map(Optional::ofNullable) + .via(Flow.flattenOptional()) + .via(Flow.flattenOptional()) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList); + } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b12813cbcb..4170a4c604 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -371,6 +371,23 @@ object Flow { def upcast[In, SuperOut, Out <: SuperOut, M](flow: Flow[In, Out, M]): Flow[In, SuperOut, M] = flow.asInstanceOf[Flow[In, SuperOut, M]] + /** + * Collect the value of [[Optional]] from the elements passing through this flow, empty [[Optional]] is filtered out. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the current [[Optional]]'s value is present. + * + * '''Backpressures when''' the value of the current [[Optional]] is present and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * * */ + def flattenOptional[Out, In <: Optional[Out]](): Flow[In, Out, NotUsed] = + new Flow(scaladsl.Flow[In].collect { + case optional: Optional[Out @unchecked] if optional.isPresent => optional.get() + }) } /**