feat: Add materializeIntoSource
This commit is contained in:
parent
28783fad61
commit
adc1d669ce
10 changed files with 125 additions and 0 deletions
|
|
@ -0,0 +1,15 @@
|
|||
# materializeIntoSource
|
||||
|
||||
Materializes this Graph, immediately returning its materialized values into a new Source.
|
||||
|
||||
@ref[Simple operators](../index.md#simple-operators)
|
||||
|
||||
## Signature
|
||||
|
||||
@apidoc[Source.materializeIntoSource](Source) { scala="#materializeIntoSource[Mat2](sink:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SinkShape[Out],scala.concurrent.Future[Mat2]]):org.apache.pekko.stream.scaladsl.Source[Mat2,scala.concurrent.Future[org.apache.pekko.NotUsed]]" java="#materializeIntoSource(org.apache.pekko.stream.Graph)" }
|
||||
@apidoc[Flow.materializeIntoSource](Flow) { scala="#materializeIntoSource[Mat1,Mat2](source:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SourceShape[In],Mat1],sink:org.apache.pekko.stream.Graph[org.apache.pekko.stream.SinkShape[Out],scala.concurrent.Future[Mat2]]):org.apache.pekko.stream.scaladsl.Source[Mat2,scala.concurrent.Future[org.apache.pekko.NotUsed]]" java="#materialize(org.apache.pekko.actor.ClassicActorSystemProvider)" java="#materializeIntoSource(org.apache.pekko.stream.Graph,org.apache.pekko.stream.Graph)" }
|
||||
|
||||
|
||||
## Description
|
||||
|
||||
Materializes this Graph, immediately returning its materialized values into a new Source.
|
||||
|
|
@ -172,6 +172,7 @@ depending on being backpressured by downstream or not.
|
|||
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|
||||
|Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|
||||
|Source/Flow|<a name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.|
|
||||
|Source/Flow|<a name="materializeintosource"></a>@ref[materializeIntoSource](Source-or-Flow/materializeIntoSource.md)|Materializes this Graph, immediately returning its materialized values into a new Source.|
|
||||
|Source/Flow|<a name="optionalvia"></a>@ref[optionalVia](Source-or-Flow/optionalVia.md)|For a stream containing optional elements, transforms each element by applying the given `viaFlow` and passing the value downstream as an optional value.|
|
||||
|Source/Flow|<a name="prematerialize"></a>@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.|
|
||||
|Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|
||||
|
|
@ -529,6 +530,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
|||
* [mapConcat](Source-or-Flow/mapConcat.md)
|
||||
* [mapError](Source-or-Flow/mapError.md)
|
||||
* [mapWithResource](Source-or-Flow/mapWithResource.md)
|
||||
* [materializeIntoSource](Source-or-Flow/materializeIntoSource.md)
|
||||
* [maybe](Source/maybe.md)
|
||||
* [merge](Source-or-Flow/merge.md)
|
||||
* [mergeAll](Source-or-Flow/mergeAll.md)
|
||||
|
|
|
|||
|
|
@ -1692,6 +1692,18 @@ public class FlowTest extends StreamTest {
|
|||
Flow.of(Integer.class).divertToMat(Sink.ignore(), e -> true, (i, n) -> "foo");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseMaterializeIntoSource() throws Exception {
|
||||
final Flow<Integer, Integer, NotUsed> flow = Flow.create();
|
||||
|
||||
final Source<List<Integer>, CompletionStage<NotUsed>> source =
|
||||
flow.map(i -> i * 2).materializeIntoSource(Source.from(Arrays.asList(1, 2, 3)), Sink.seq());
|
||||
|
||||
final CompletionStage<List<Integer>> resultList = source.runWith(Sink.head(), system);
|
||||
|
||||
assertEquals(Arrays.asList(2, 4, 6), resultList.toCompletableFuture().get(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseLazyInit() throws Exception {
|
||||
final CompletionStage<Flow<Integer, Integer, NotUsed>> future = new CompletableFuture<>();
|
||||
|
|
|
|||
|
|
@ -1441,6 +1441,16 @@ public class SourceTest extends StreamTest {
|
|||
Source.<Integer>empty().preMaterialize(system);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseMaterializeIntoSource() throws Exception {
|
||||
final List<Integer> input = Arrays.asList(1, 2, 3);
|
||||
final Source<List<Integer>, CompletionStage<NotUsed>> source =
|
||||
Source.from(input).materializeIntoSource(Sink.seq());
|
||||
final CompletionStage<List<Integer>> resultList = source.runWith(Sink.head(), system);
|
||||
|
||||
assertEquals(input, resultList.toCompletableFuture().get(1, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToConvertToJavaInJava() {
|
||||
final org.apache.pekko.stream.scaladsl.Source<Integer, NotUsed> scalaSource =
|
||||
|
|
|
|||
|
|
@ -639,6 +639,12 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
|
|||
counter.get() should (be(0))
|
||||
}
|
||||
|
||||
"materialize into source" in {
|
||||
val source = Flow[Int].map(_ * 2)
|
||||
.materializeIntoSource(Source(List(1, 2, 3)), Sink.seq)
|
||||
|
||||
source.runWith(Sink.head).futureValue should ===(List(2, 4, 6))
|
||||
}
|
||||
}
|
||||
|
||||
object TestException extends RuntimeException with NoStackTrace
|
||||
|
|
|
|||
|
|
@ -526,6 +526,13 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
|
|||
|
||||
a[RuntimeException] shouldBe thrownBy(matValPoweredSource.preMaterialize())
|
||||
}
|
||||
|
||||
"materialize into source" in {
|
||||
val input = List(1, 2, 3)
|
||||
val source = Source(input).materializeIntoSource(Sink.seq)
|
||||
|
||||
source.runWith(Sink.head).futureValue should ===(input)
|
||||
}
|
||||
}
|
||||
|
||||
"Source.futureSource" must {
|
||||
|
|
|
|||
|
|
@ -622,6 +622,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
|||
pekko.japi.Pair(som, sim)
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects the [[Source]] to this [[Flow]] and materializes it using the [[Sink]], immediately returning the values
|
||||
* via the provided [[Sink]] as a new [[Source]].
|
||||
*
|
||||
* @param source A source that connects to this flow
|
||||
* @param sink A sink which needs to materialize into a [[CompletionStage]], typically one
|
||||
* that collects values such as [[Sink.head]] or [[Sink.seq]]
|
||||
* @return A new [[Source]] that contains the results of the [[Flow]] with the provided
|
||||
* [[Source]]'s elements run with the [[Sink]]
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def materializeIntoSource[Mat1, Mat2](source: Graph[SourceShape[In], Mat1],
|
||||
sink: Graph[SinkShape[Out], CompletionStage[Mat2]])
|
||||
: Source[Mat2, CompletionStage[NotUsed]] = {
|
||||
Source.fromMaterializer { (mat, attr) =>
|
||||
Source.completionStage(
|
||||
Source.fromGraph(source).via(this).withAttributes(attr).toMat(sink,
|
||||
Keep.right[Mat1, CompletionStage[Mat2]]).run(mat)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -876,6 +876,22 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
|||
Pair(mat, new Source(src))
|
||||
}
|
||||
|
||||
/**
|
||||
* Materializes this [[Source]] using the [[Sink]], immediately returning the values via the
|
||||
* provided [[Sink]] as a new [[Source]].
|
||||
*
|
||||
* @param sink A sink which needs to materialize into a [[CompletionStage]], typically one
|
||||
* that collects values such as [[Sink.head]] or [[Sink.seq]]
|
||||
* @return A new [[Source]] that contains the results of the provided [[Source]]'s
|
||||
* elements run with the [[Sink]]
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def materializeIntoSource[Mat2](
|
||||
sink: Graph[SinkShape[Out], CompletionStage[Mat2]]): Source[Mat2, CompletionStage[NotUsed]] =
|
||||
Source.fromMaterializer { (mat, attr) =>
|
||||
Source.completionStage(this.withAttributes(attr).toMat(sink, Keep.right[Mat, CompletionStage[Mat2]]).run(mat))
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform this [[Source]] by appending the given processing operators.
|
||||
* {{{
|
||||
|
|
|
|||
|
|
@ -172,6 +172,26 @@ final class Flow[-In, +Out, +Mat](
|
|||
(mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), Source.fromPublisher(pub)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects the [[Source]] to this [[Flow]] and materializes it using the [[Sink]], immediately returning the values
|
||||
* via the provided [[Sink]] as a new [[Source]].
|
||||
*
|
||||
* @param source A source that connects to this flow
|
||||
* @param sink A sink which needs to materialize into a [[Future]], typically one
|
||||
* that collects values such as [[Sink.head]] or [[Sink.seq]]
|
||||
* @return A new [[Source]] that contains the results of the [[Flow]] with the provided
|
||||
* [[Source]]'s elements run with the [[Sink]]
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def materializeIntoSource[Mat1, Mat2](source: Graph[SourceShape[In], Mat1],
|
||||
sink: Graph[SinkShape[Out], Future[Mat2]])
|
||||
: Source[Mat2, Future[NotUsed]] =
|
||||
Source.fromMaterializer { (mat, attr) =>
|
||||
Source.future(
|
||||
Source.fromGraph(source).via(this).withAttributes(attr).toMat(sink)(Keep.right).run()(mat)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform this Flow by applying a function to each *incoming* upstream element before
|
||||
* it is passed to the [[Flow]]
|
||||
|
|
|
|||
|
|
@ -109,6 +109,21 @@ final class Source[+Out, +Mat](
|
|||
(mat, Source.fromPublisher(pub))
|
||||
}
|
||||
|
||||
/**
|
||||
* Materializes this [[Source]] using the [[Sink]], immediately returning the values via the
|
||||
* provided [[Sink]] as a new [[Source]].
|
||||
*
|
||||
* @param sink A sink which needs to materialize into a [[Future]], typically one
|
||||
* that collects values such as [[Sink.head]] or [[Sink.seq]]
|
||||
* @return A new [[Source]] that contains the results of the provided [[Source]]'s
|
||||
* elements run with the [[Sink]]
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def materializeIntoSource[Mat2](sink: Graph[SinkShape[Out], Future[Mat2]]): Source[Mat2, Future[NotUsed]] =
|
||||
Source.fromMaterializer { (mat, attr) =>
|
||||
Source.future(this.withAttributes(attr).toMat(sink)(Keep.right).run()(mat))
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded.
|
||||
*
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue