+str Add Flow#preMaterialize. (#31190)

This commit is contained in:
kerr 2022-04-01 16:05:52 +08:00 committed by GitHub
parent b5c29e05c0
commit db3b283034
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 126 additions and 6 deletions

View file

@ -0,0 +1,16 @@
# preMaterialize
Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.
@ref[Simple operators](../index.md#simple-operators)
## Signature
@apidoc[Source.preMaterialize](Source) { scala="#preMaterialize()(implicitmaterializer:akka.stream.Materializer):(Mat,akka.stream.scaladsl.Source[Out,akka.NotUsed])" java="#preMaterialize(akka.actor.ClassicActorSystemProvider)" java="#preMaterialize(akka.stream.Materializer)" }
@apidoc[Flow.preMaterialize](Flow) { scala="#preMaterialize()(implicitmaterializer:akka.stream.Materializer):(Mat,akka.stream.scaladsl.Flow[Int,Out,akka.NotUsed])" java="#preMaterialize(akka.actor.ClassicActorSystemProvider)" java="#preMaterialize(akka.stream.Materializer)" }
## Description
Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.

View file

@ -100,8 +100,8 @@ For example, following snippet will fall with timeout exception:
```scala
...
.toMat(StreamConverters.asInputStream().mapMaterializedValue { inputStream =>
inputStream.read() // this could block forever
...
inputStream.read() // this could block forever
...
}).run()
```
@ -166,6 +166,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="logwithmarker"></a>@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log elements flowing through the stream as well as completion and erroring.|
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|Source/Flow|<a name="prematerialize"></a>@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.|
|Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow|<a name="scan"></a>@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
|Source/Flow|<a name="scanasync"></a>@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like @ref[`scan`](Source-or-Flow/./scan.md) but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.|

View file

@ -5,23 +5,24 @@
package akka.stream.scaladsl
import java.util.concurrent.ThreadLocalRandom
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import scala.annotation.nowarn
import com.typesafe.config.ConfigFactory
import org.reactivestreams.{ Publisher, Subscriber }
import akka.NotUsed
import akka.stream._
import akka.stream.impl._
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.TestDuration
import java.util.concurrent.atomic.AtomicLong
object FlowSpec {
class Fruit extends Serializable
class Apple extends Fruit
@ -521,6 +522,81 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re
}
}
/**
* Count elements that passing by this flow
* */
private class CounterFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], AtomicLong] {
private val in = Inlet[T]("ElementCounterFlow.in")
private val out = Outlet[T]("ElementCounterFlow.out")
val shape = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, AtomicLong) = {
val counter = new AtomicLong()
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
counter.incrementAndGet()
push(out, grab(in))
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
(logic, counter)
}
}
"Flow pre-materialization" must {
"passing elements to downstream" in {
val (counter, flow) = Flow.fromGraph(new CounterFlow[Int]).preMaterialize()
val probe = Source(List(1, 2, 3)).via(flow).reduce((a, b) => a + b).runWith(TestSink.probe[Int])
probe.request(1)
probe.expectNext(6)
probe.request(1)
probe.expectComplete()
counter.get() should (be(3))
}
"propagate failures to downstream" in {
val (queue, source) = Source.queue[Int](1).preMaterialize()
val (counter, flow) = Flow.fromGraph(new CounterFlow[Int]).preMaterialize()
val probe = source.via(flow).runWith(TestSink.probe[Int])
queue.offer(1)
probe.request(1)
probe.expectNext(1)
queue.fail(new RuntimeException("boom"))
probe.expectError().getMessage should ===("boom")
counter.get() should (be(1))
}
"disallow materialize multiple times" in {
val (counter, flow) = Flow.fromGraph(new CounterFlow[Int]).preMaterialize()
val probe1 = Source(List(1, 2, 3)).via(flow).reduce((a, b) => a + b).runWith(TestSink.probe[Int])
probe1.request(1)
probe1.expectNext(6)
probe1.request(1)
probe1.expectComplete()
counter.get() should (be(3))
val probe2 = Source(List(1, 2, 3)).via(flow).reduce((a, b) => a + b).runWith(TestSink.probe[Int])
probe2.request(1)
probe2.expectError()
}
"propagate failure to downstream when materializing" in {
a[RuntimeException] shouldBe thrownBy(
Flow
.fromGraph(new CounterFlow[Int])
.mapMaterializedValue(_ => throw new RuntimeException("boom"))
.preMaterialize())
}
"propagate cancel to upstream" in {
val (counter, flow) = Flow.fromGraph(new CounterFlow[Int]).preMaterialize()
val probSource = TestSource.probe[Int].via(flow).toMat(Sink.cancelled[Int])(Keep.left).run()
probSource.ensureSubscription()
probSource.expectCancellation()
counter.get() should (be(0))
}
}
object TestException extends RuntimeException with NoStackTrace
}

View file

@ -393,6 +393,23 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] =
new Flow(delegate.mapMaterializedValue(f.apply _))
/**
* Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]].
*/
def preMaterialize(
systemProvider: ClassicActorSystemProvider): akka.japi.Pair[Mat @uncheckedVariance, Flow[In, Out, NotUsed]] = {
preMaterialize(SystemMaterializer(systemProvider.classicSystem).materializer)
}
/**
* Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]].
* The returned flow is partial materialized and do not support multiple times materialization.
*/
def preMaterialize(materializer: Materializer): akka.japi.Pair[Mat @uncheckedVariance, Flow[In, Out, NotUsed]] = {
val (mat, flow) = delegate.preMaterialize()(materializer)
akka.japi.Pair(mat, flow.asJava)
}
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{

View file

@ -145,6 +145,16 @@ final class Flow[-In, +Out, +Mat](
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
new Flow(traversalBuilder.transformMat(f), shape)
/**
* Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]].
* The returned flow is partial materialized and do not support multiple times materialization.
*/
def preMaterialize()(implicit materializer: Materializer): (Mat, ReprMat[Out, NotUsed]) = {
val ((sub, mat), pub) =
Source.asSubscriber[In].viaMat(this)(Keep.both).toMat(Sink.asPublisher(false))(Keep.both).run()
(mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), Source.fromPublisher(pub)))
}
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{

View file

@ -74,7 +74,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
// FIXME document these methods as well
val pendingTestCases = Map(
"Source" -> (pendingSourceOrFlow ++ Seq("preMaterialize")),
"Source" -> (pendingSourceOrFlow),
"Flow" -> (pendingSourceOrFlow ++ Seq(
"lazyInit",
"fromProcessorMat",