Replace broadcast with unzip for unsafeDataVia (#31440)

This commit is contained in:
Matthew de Detrich 2022-08-23 16:09:17 +02:00 committed by GitHub
parent d6ed0f7b27
commit cea21259d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 8 deletions

View file

@ -50,13 +50,13 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(delegate) { implicit b => d =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[(Out, CtxOut)](2))
val unzip = b.add(Unzip[Out, CtxOut]())
val zipper = b.add(Zip[Out2, CtxOut]())
d ~> bcast.in
d ~> unzip.in
bcast.out(0).map { case (dataOut, _) => dataOut }.via(viaFlow) ~> zipper.in0
bcast.out(1).map { case (_, ctxOut) => ctxOut } ~> zipper.in1
unzip.out0.via(viaFlow) ~> zipper.in0
unzip.out1 ~> zipper.in1
FlowShape(d.in, zipper.out)
}))

View file

@ -37,13 +37,13 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(delegate) { implicit b => d =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[(Out, Ctx)](2))
val unzip = b.add(Unzip[Out, Ctx]())
val zipper = b.add(Zip[Out2, Ctx]())
d ~> bcast.in
d ~> unzip.in
bcast.out(0).map { case (dataOut, _) => dataOut }.via(viaFlow) ~> zipper.in0
bcast.out(1).map { case (_, ctxOut) => ctxOut } ~> zipper.in1
unzip.out0.via(viaFlow) ~> zipper.in0
unzip.out1 ~> zipper.in1
SourceShape(zipper.out)
}))