diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index e301bfb822..405920a9ec 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -440,6 +440,32 @@ class FlowGraphCompileSpec extends AkkaSpec { } } + "Junction is connected through GraphFlow" in { + val gflow = Flow[Int, String]() { implicit builder ⇒ + import FlowGraphImplicits._ + + val in = UndefinedSource[Int] + val out = UndefinedSink[String] + + in ~> Flow[Int].map(_.toString) ~> out + + (in, out) + } + + val sink = Sink.fold[Int, Int](0)(_ + _) + val graph = FlowGraph { implicit builder ⇒ + import FlowGraphImplicits._ + + val merge = Merge[Int] + + Source(List(1, 2, 3)) ~> merge + Source.empty[Int] ~> merge + merge ~> gflow.map(_.toInt) ~> sink + } + + graph.run() + } + "UndefinedSource is connected directly" in { PartialFlowGraph { implicit b ⇒ import FlowGraphImplicits._ @@ -459,6 +485,7 @@ class FlowGraphCompileSpec extends AkkaSpec { UndefinedSource[Int] ~> Flow[Int] ~> UndefinedSink[Int] } } + } "build partial with only undefined sources and sinks" in { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index 5b2654e035..f1bded5f30 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -702,6 +702,8 @@ class FlowGraphBuilder private[akka] ( connect(tOut, flow, tIn) case (pipe: Pipe[In, Out], sink: Sink[Out]) ⇒ addPipeToSinkEdge(junctionOut, pipe, sink) + case (gf: GraphFlow[_, Out, _, _], sink: Sink[Out]) ⇒ + addPipeToSinkEdge(junctionOut, gf.inPipe, sink) case x ⇒ throwUnsupportedValue(x) } this