From adc1d669cef6ca724b18c17aa80be331b493508a Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Fri, 2 May 2025 14:35:25 +0200 Subject: [PATCH] feat: Add materializeIntoSource --- .../Source-or-Flow/materializeIntoSource.md | 15 +++++++++++++ .../main/paradox/stream/operators/index.md | 2 ++ .../apache/pekko/stream/javadsl/FlowTest.java | 12 ++++++++++ .../pekko/stream/javadsl/SourceTest.java | 10 +++++++++ .../pekko/stream/scaladsl/FlowSpec.scala | 6 +++++ .../pekko/stream/scaladsl/SourceSpec.scala | 7 ++++++ .../apache/pekko/stream/javadsl/Flow.scala | 22 +++++++++++++++++++ .../apache/pekko/stream/javadsl/Source.scala | 16 ++++++++++++++ .../apache/pekko/stream/scaladsl/Flow.scala | 20 +++++++++++++++++ .../apache/pekko/stream/scaladsl/Source.scala | 15 +++++++++++++ 10 files changed, 125 insertions(+) create mode 100644 docs/src/main/paradox/stream/operators/Source-or-Flow/materializeIntoSource.md diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/materializeIntoSource.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/materializeIntoSource.md new file mode 100644 index 0000000000..7c43e4d3fb --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/materializeIntoSource.md @@ -0,0 +1,15 @@ +# materializeIntoSource + +Materializes this Graph, immediately returning its materialized values into a new Source. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.materializeIntoSource](Source) { scala="#materializeIntoSource[Mat2](sink:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SinkShape[Out],scala.concurrent.Future[Mat2]]):org.apache.pekko.stream.scaladsl.Source[Mat2,scala.concurrent.Future[org.apache.pekko.NotUsed]]" java="#materializeIntoSource(org.apache.pekko.stream.Graph)" } +@apidoc[Flow.materializeIntoSource](Flow) { scala="#materializeIntoSource[Mat1,Mat2](source:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[In],Mat1],sink:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SinkShape[Out],scala.concurrent.Future[Mat2]]):org.apache.pekko.stream.scaladsl.Source[Mat2,scala.concurrent.Future[org.apache.pekko.NotUsed]]" java="#materialize(org.apache.pekko.actor.ClassicActorSystemProvider)" java="#materializeIntoSource(org.apache.pekko.stream.Graph,org.apache.pekko.stream.Graph)" } + + +## Description + +Materializes this Graph, immediately returning its materialized values into a new Source. diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 120cc91df0..8f46341db3 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -172,6 +172,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.| |Source/Flow|@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| |Source/Flow|@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.| +|Source/Flow|@ref[materializeIntoSource](Source-or-Flow/materializeIntoSource.md)|Materializes this Graph, immediately returning its materialized values into a new Source.| |Source/Flow|@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.| |Source/Flow|@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.| |Source/Flow|@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.| @@ -529,6 +530,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [mapConcat](Source-or-Flow/mapConcat.md) * [mapError](Source-or-Flow/mapError.md) * [mapWithResource](Source-or-Flow/mapWithResource.md) +* [materializeIntoSource](Source-or-Flow/materializeIntoSource.md) * [maybe](Source/maybe.md) * [merge](Source-or-Flow/merge.md) * [mergeAll](Source-or-Flow/mergeAll.md) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 0a45acc2bd..c8cb7f06fc 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1692,6 +1692,18 @@ public class FlowTest extends StreamTest { Flow.of(Integer.class).divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo"); } + @Test + public void mustBeAbleToUseMaterializeIntoSource() throws Exception { + final Flow flow = Flow.create(); + + final Source, CompletionStage> source = + flow.map(i -> i * 2).materializeIntoSource(Source.from(Arrays.asList(1, 2, 3)), Sink.seq()); + + final CompletionStage> resultList = source.runWith(Sink.head(), system); + + assertEquals(Arrays.asList(2, 4, 6), resultList.toCompletableFuture().get(1, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToUseLazyInit() throws Exception { final CompletionStage> future = new CompletableFuture<>(); diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index e6bbd01be0..ed9946234d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1441,6 +1441,16 @@ public class SourceTest extends StreamTest { Source.empty().preMaterialize(system); } + @Test + public void mustBeAbleToUseMaterializeIntoSource() throws Exception { + final List input = Arrays.asList(1, 2, 3); + final Source, CompletionStage> source = + Source.from(input).materializeIntoSource(Sink.seq()); + final CompletionStage> resultList = source.runWith(Sink.head(), system); + + assertEquals(input, resultList.toCompletableFuture().get(1, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToConvertToJavaInJava() { final org.apache.pekko.stream.scaladsl.Source scalaSource = diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala index 3c918db1ec..0364f130cb 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala @@ -639,6 +639,12 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r counter.get() should (be(0)) } + "materialize into source" in { + val source = Flow[Int].map(_ * 2) + .materializeIntoSource(Source(List(1, 2, 3)), Sink.seq) + + source.runWith(Sink.head).futureValue should ===(List(2, 4, 6)) + } } object TestException extends RuntimeException with NoStackTrace diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index a54a17a4c7..cb37d951e6 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -526,6 +526,13 @@ class SourceSpec extends StreamSpec with DefaultTimeout { a[RuntimeException] shouldBe thrownBy(matValPoweredSource.preMaterialize()) } + + "materialize into source" in { + val input = List(1, 2, 3) + val source = Source(input).materializeIntoSource(Sink.seq) + + source.runWith(Sink.head).futureValue should ===(input) + } } "Source.futureSource" must { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 150096dc65..4eedf5f2ca 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -622,6 +622,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr pekko.japi.Pair(som, sim) } + /** + * Connects the [[Source]] to this [[Flow]] and materializes it using the [[Sink]], immediately returning the values + * via the provided [[Sink]] as a new [[Source]]. + * + * @param source A source that connects to this flow + * @param sink A sink which needs to materialize into a [[CompletionStage]], typically one + * that collects values such as [[Sink.head]] or [[Sink.seq]] + * @return A new [[Source]] that contains the results of the [[Flow]] with the provided + * [[Source]]'s elements run with the [[Sink]] + * @since 1.2.0 + */ + def materializeIntoSource[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], + sink: Graph[SinkShape[Out], CompletionStage[Mat2]]) + : Source[Mat2, CompletionStage[NotUsed]] = { + Source.fromMaterializer { (mat, attr) => + Source.completionStage( + Source.fromGraph(source).via(this).withAttributes(attr).toMat(sink, + Keep.right[Mat1, CompletionStage[Mat2]]).run(mat) + ) + } + } + /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 2673791b3c..13b473e554 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -876,6 +876,22 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ Pair(mat, new Source(src)) } + /** + * Materializes this [[Source]] using the [[Sink]], immediately returning the values via the + * provided [[Sink]] as a new [[Source]]. + * + * @param sink A sink which needs to materialize into a [[CompletionStage]], typically one + * that collects values such as [[Sink.head]] or [[Sink.seq]] + * @return A new [[Source]] that contains the results of the provided [[Source]]'s + * elements run with the [[Sink]] + * @since 1.2.0 + */ + def materializeIntoSource[Mat2]( + sink: Graph[SinkShape[Out], CompletionStage[Mat2]]): Source[Mat2, CompletionStage[NotUsed]] = + Source.fromMaterializer { (mat, attr) => + Source.completionStage(this.withAttributes(attr).toMat(sink, Keep.right[Mat, CompletionStage[Mat2]]).run(mat)) + } + /** * Transform this [[Source]] by appending the given processing operators. * {{{ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 5ecbd5c81a..49c8ce7d07 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -172,6 +172,26 @@ final class Flow[-In, +Out, +Mat]( (mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), Source.fromPublisher(pub))) } + /** + * Connects the [[Source]] to this [[Flow]] and materializes it using the [[Sink]], immediately returning the values + * via the provided [[Sink]] as a new [[Source]]. + * + * @param source A source that connects to this flow + * @param sink A sink which needs to materialize into a [[Future]], typically one + * that collects values such as [[Sink.head]] or [[Sink.seq]] + * @return A new [[Source]] that contains the results of the [[Flow]] with the provided + * [[Source]]'s elements run with the [[Sink]] + * @since 1.2.0 + */ + def materializeIntoSource[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], + sink: Graph[SinkShape[Out], Future[Mat2]]) + : Source[Mat2, Future[NotUsed]] = + Source.fromMaterializer { (mat, attr) => + Source.future( + Source.fromGraph(source).via(this).withAttributes(attr).toMat(sink)(Keep.right).run()(mat) + ) + } + /** * Transform this Flow by applying a function to each *incoming* upstream element before * it is passed to the [[Flow]] diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 54ace5a111..9dc14dd4f1 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -109,6 +109,21 @@ final class Source[+Out, +Mat]( (mat, Source.fromPublisher(pub)) } + /** + * Materializes this [[Source]] using the [[Sink]], immediately returning the values via the + * provided [[Sink]] as a new [[Source]]. + * + * @param sink A sink which needs to materialize into a [[Future]], typically one + * that collects values such as [[Sink.head]] or [[Sink.seq]] + * @return A new [[Source]] that contains the results of the provided [[Source]]'s + * elements run with the [[Sink]] + * @since 1.2.0 + */ + def materializeIntoSource[Mat2](sink: Graph[SinkShape[Out], Future[Mat2]]): Source[Mat2, Future[NotUsed]] = + Source.fromMaterializer { (mat, attr) => + Source.future(this.withAttributes(attr).toMat(sink)(Keep.right).run()(mat)) + } + /** * Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded. *