+str Add Flow#flattenOptional (#31285)

This commit is contained in:
kerr 2022-04-20 19:06:16 +08:00 committed by GitHub
parent a1e88f2f25
commit 4949b12c5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 4 deletions

View file

@ -0,0 +1,29 @@
# Flow.flattenOptional
Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.
@ref[Simple operators](../index.md#simple-operators)
## Signature
@apidoc[Flow.flattenOptional](Flow$) { java="#flattenOptional(akka.stream.javadsl.Flow)" }
## Description
Streams the elements through the given future flow once it successfully completes.
If the future fails the stream is failed.
## Reactive Streams semantics
@@@div { .callout }
**Emits when** the current @javadoc[Optional](java.util.Optional)'s value is present.
**Backpressures when** the value of the current @javadoc[Optional](java.util.Optional) is present and downstream backpressures.
**Completes when** upstream completes.
**Cancels when** downstream cancels.
@@@

View file

@ -150,6 +150,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element|
|Source/Flow|<a name="filter"></a>@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.|
|Source/Flow|<a name="filternot"></a>@ref[filterNot](Source-or-Flow/filterNot.md)|Filter the incoming elements using a predicate.|
|Flow|<a name="flattenoptional"></a>@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.|
|Source/Flow|<a name="fold"></a>@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.|
|Source/Flow|<a name="foldasync"></a>@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|
|Source/Flow|<a name="frommaterializer"></a>@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
@ -423,6 +424,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [flatMapConcat](Source-or-Flow/flatMapConcat.md)
* [flatMapMerge](Source-or-Flow/flatMapMerge.md)
* [flatMapPrefix](Source-or-Flow/flatMapPrefix.md)
* [flattenOptional](Flow/flattenOptional.md)
* [fold](Source-or-Flow/fold.md)
* [fold](Sink/fold.md)
* [foldAsync](Source-or-Flow/foldAsync.md)

View file

@ -1179,16 +1179,21 @@ public class SourceTest extends StreamTest {
}
@Test
public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletion() {
public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletion() throws Exception {
final Iterator<Integer> iterator = IntStream.range(1, 10).iterator();
final Creator<Iterator<Integer>> input = () -> iterator;
final Done completion =
Source.fromIterator(input).map(it -> it * 10).run(system).toCompletableFuture().join();
Source.fromIterator(input)
.map(it -> it * 10)
.run(system)
.toCompletableFuture()
.get(1, TimeUnit.SECONDS);
assertEquals(Done.getInstance(), completion);
}
@Test
public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletionWithMaterializer() {
public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletionWithMaterializer()
throws Exception {
final Materializer materializer = Materializer.createMaterializer(system);
final Iterator<Integer> iterator = IntStream.range(1, 10).iterator();
final Creator<Iterator<Integer>> input = () -> iterator;
@ -1197,7 +1202,7 @@ public class SourceTest extends StreamTest {
.map(it -> it * 10)
.run(materializer)
.toCompletableFuture()
.join();
.get(3, TimeUnit.SECONDS);
assertEquals(Done.getInstance(), completion);
}
@ -1235,4 +1240,31 @@ public class SourceTest extends StreamTest {
1346269, 2178309, 3524578, 5702887, 9227465),
resultList);
}
@Test
public void flattenOptional() throws Exception {
// #flattenOptional
final CompletionStage<List<Integer>> resultList =
Source.range(1, 10)
.map(x -> Optional.of(x).filter(n -> n % 2 == 0))
.via(Flow.flattenOptional())
.runWith(Sink.seq(), system);
// #flattenOptional
Assert.assertEquals(
Arrays.asList(2, 4, 6, 8, 10), resultList.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
public void flattenOptionalOptional() throws Exception {
final List<Integer> resultList =
Source.range(1, 10)
.map(x -> Optional.of(x).filter(n -> n % 2 == 0))
.map(Optional::ofNullable)
.via(Flow.flattenOptional())
.via(Flow.flattenOptional())
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList);
}
}

View file

@ -371,6 +371,23 @@ object Flow {
def upcast[In, SuperOut, Out <: SuperOut, M](flow: Flow[In, Out, M]): Flow[In, SuperOut, M] =
flow.asInstanceOf[Flow[In, SuperOut, M]]
/**
* Collect the value of [[Optional]] from the elements passing through this flow, empty [[Optional]] is filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the current [[Optional]]'s value is present.
*
* '''Backpressures when''' the value of the current [[Optional]] is present and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
* * */
def flattenOptional[Out, In <: Optional[Out]](): Flow[In, Out, NotUsed] =
new Flow(scaladsl.Flow[In].collect {
case optional: Optional[Out @unchecked] if optional.isPresent => optional.get()
})
}
/**