diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/flattenOptional.md b/akka-docs/src/main/paradox/stream/operators/Flow/flattenOptional.md
new file mode 100644
index 0000000000..a559189329
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Flow/flattenOptional.md
@@ -0,0 +1,29 @@
+# Flow.flattenOptional
+
+Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Flow.flattenOptional](Flow$) { java="#flattenOptional(akka.stream.javadsl.Flow)" }
+
+
+## Description
+
+Streams the elements through the given future flow once it successfully completes.
+If the future fails the stream is failed.
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**Emits when** the current @javadoc[Optional](java.util.Optional)'s value is present.
+
+**Backpressures when** the value of the current @javadoc[Optional](java.util.Optional) is present and downstream backpressures.
+
+**Completes when** upstream completes.
+
+**Cancels when** downstream cancels.
+@@@
+
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index d7a4e8b970..dd3daafda5 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -150,6 +150,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element|
|Source/Flow|@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.|
|Source/Flow|@ref[filterNot](Source-or-Flow/filterNot.md)|Filter the incoming elements using a predicate.|
+|Flow|@ref[flattenOptional](Flow/flattenOptional.md)|Collect the value of `Optional` from all the elements passing through this flow , empty `Optional` is filtered out.|
|Source/Flow|@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.|
|Source/Flow|@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|
|Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`|
@@ -423,6 +424,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [flatMapConcat](Source-or-Flow/flatMapConcat.md)
* [flatMapMerge](Source-or-Flow/flatMapMerge.md)
* [flatMapPrefix](Source-or-Flow/flatMapPrefix.md)
+* [flattenOptional](Flow/flattenOptional.md)
* [fold](Source-or-Flow/fold.md)
* [fold](Sink/fold.md)
* [foldAsync](Source-or-Flow/foldAsync.md)
diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
index 56c820e32b..9910d60521 100644
--- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
+++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java
@@ -1179,16 +1179,21 @@ public class SourceTest extends StreamTest {
}
@Test
- public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletion() {
+ public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletion() throws Exception {
final Iterator iterator = IntStream.range(1, 10).iterator();
final Creator> input = () -> iterator;
final Done completion =
- Source.fromIterator(input).map(it -> it * 10).run(system).toCompletableFuture().join();
+ Source.fromIterator(input)
+ .map(it -> it * 10)
+ .run(system)
+ .toCompletableFuture()
+ .get(1, TimeUnit.SECONDS);
assertEquals(Done.getInstance(), completion);
}
@Test
- public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletionWithMaterializer() {
+ public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletionWithMaterializer()
+ throws Exception {
final Materializer materializer = Materializer.createMaterializer(system);
final Iterator iterator = IntStream.range(1, 10).iterator();
final Creator> input = () -> iterator;
@@ -1197,7 +1202,7 @@ public class SourceTest extends StreamTest {
.map(it -> it * 10)
.run(materializer)
.toCompletableFuture()
- .join();
+ .get(3, TimeUnit.SECONDS);
assertEquals(Done.getInstance(), completion);
}
@@ -1235,4 +1240,31 @@ public class SourceTest extends StreamTest {
1346269, 2178309, 3524578, 5702887, 9227465),
resultList);
}
+
+ @Test
+ public void flattenOptional() throws Exception {
+ // #flattenOptional
+ final CompletionStage> resultList =
+ Source.range(1, 10)
+ .map(x -> Optional.of(x).filter(n -> n % 2 == 0))
+ .via(Flow.flattenOptional())
+ .runWith(Sink.seq(), system);
+ // #flattenOptional
+ Assert.assertEquals(
+ Arrays.asList(2, 4, 6, 8, 10), resultList.toCompletableFuture().get(3, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void flattenOptionalOptional() throws Exception {
+ final List resultList =
+ Source.range(1, 10)
+ .map(x -> Optional.of(x).filter(n -> n % 2 == 0))
+ .map(Optional::ofNullable)
+ .via(Flow.flattenOptional())
+ .via(Flow.flattenOptional())
+ .runWith(Sink.seq(), system)
+ .toCompletableFuture()
+ .get(3, TimeUnit.SECONDS);
+ Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList);
+ }
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index b12813cbcb..4170a4c604 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -371,6 +371,23 @@ object Flow {
def upcast[In, SuperOut, Out <: SuperOut, M](flow: Flow[In, Out, M]): Flow[In, SuperOut, M] =
flow.asInstanceOf[Flow[In, SuperOut, M]]
+ /**
+ * Collect the value of [[Optional]] from the elements passing through this flow, empty [[Optional]] is filtered out.
+ *
+ * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
+ *
+ * '''Emits when''' the current [[Optional]]'s value is present.
+ *
+ * '''Backpressures when''' the value of the current [[Optional]] is present and downstream backpressures
+ *
+ * '''Completes when''' upstream completes
+ *
+ * '''Cancels when''' downstream cancels
+ * * */
+ def flattenOptional[Out, In <: Optional[Out]](): Flow[In, Out, NotUsed] =
+ new Flow(scaladsl.Flow[In].collect {
+ case optional: Optional[Out @unchecked] if optional.isPresent => optional.get()
+ })
}
/**