diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala index 7461109c0e..6e4e8af653 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -50,13 +50,13 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(delegate) { implicit b => d => import GraphDSL.Implicits._ - val bcast = b.add(Broadcast[(Out, CtxOut)](2)) + val unzip = b.add(Unzip[Out, CtxOut]()) val zipper = b.add(Zip[Out2, CtxOut]()) - d ~> bcast.in + d ~> unzip.in - bcast.out(0).map { case (dataOut, _) => dataOut }.via(viaFlow) ~> zipper.in0 - bcast.out(1).map { case (_, ctxOut) => ctxOut } ~> zipper.in1 + unzip.out0.via(viaFlow) ~> zipper.in0 + unzip.out1 ~> zipper.in1 FlowShape(d.in, zipper.out) })) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index 6ee7dbeb7d..3eba12466b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -37,13 +37,13 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(delegate) { implicit b => d => import GraphDSL.Implicits._ - val bcast = b.add(Broadcast[(Out, Ctx)](2)) + val unzip = b.add(Unzip[Out, Ctx]()) val zipper = b.add(Zip[Out2, Ctx]()) - d ~> bcast.in + d ~> unzip.in - bcast.out(0).map { case (dataOut, _) => dataOut }.via(viaFlow) ~> zipper.in0 - bcast.out(1).map { case (_, ctxOut) => ctxOut } ~> zipper.in1 + unzip.out0.via(viaFlow) ~> zipper.in0 + unzip.out1 ~> zipper.in1 SourceShape(zipper.out) }))