=str #16609 add missing handling of FlowGraphs when connecting
This commit is contained in:
parent
840575afe9
commit
08f77bcad0
2 changed files with 29 additions and 0 deletions
|
|
@ -440,6 +440,32 @@ class FlowGraphCompileSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"Junction is connected through GraphFlow" in {
|
||||||
|
val gflow = Flow[Int, String]() { implicit builder ⇒
|
||||||
|
import FlowGraphImplicits._
|
||||||
|
|
||||||
|
val in = UndefinedSource[Int]
|
||||||
|
val out = UndefinedSink[String]
|
||||||
|
|
||||||
|
in ~> Flow[Int].map(_.toString) ~> out
|
||||||
|
|
||||||
|
(in, out)
|
||||||
|
}
|
||||||
|
|
||||||
|
val sink = Sink.fold[Int, Int](0)(_ + _)
|
||||||
|
val graph = FlowGraph { implicit builder ⇒
|
||||||
|
import FlowGraphImplicits._
|
||||||
|
|
||||||
|
val merge = Merge[Int]
|
||||||
|
|
||||||
|
Source(List(1, 2, 3)) ~> merge
|
||||||
|
Source.empty[Int] ~> merge
|
||||||
|
merge ~> gflow.map(_.toInt) ~> sink
|
||||||
|
}
|
||||||
|
|
||||||
|
graph.run()
|
||||||
|
}
|
||||||
|
|
||||||
"UndefinedSource is connected directly" in {
|
"UndefinedSource is connected directly" in {
|
||||||
PartialFlowGraph { implicit b ⇒
|
PartialFlowGraph { implicit b ⇒
|
||||||
import FlowGraphImplicits._
|
import FlowGraphImplicits._
|
||||||
|
|
@ -459,6 +485,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
|
||||||
UndefinedSource[Int] ~> Flow[Int] ~> UndefinedSink[Int]
|
UndefinedSource[Int] ~> Flow[Int] ~> UndefinedSink[Int]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"build partial with only undefined sources and sinks" in {
|
"build partial with only undefined sources and sinks" in {
|
||||||
|
|
|
||||||
|
|
@ -702,6 +702,8 @@ class FlowGraphBuilder private[akka] (
|
||||||
connect(tOut, flow, tIn)
|
connect(tOut, flow, tIn)
|
||||||
case (pipe: Pipe[In, Out], sink: Sink[Out]) ⇒
|
case (pipe: Pipe[In, Out], sink: Sink[Out]) ⇒
|
||||||
addPipeToSinkEdge(junctionOut, pipe, sink)
|
addPipeToSinkEdge(junctionOut, pipe, sink)
|
||||||
|
case (gf: GraphFlow[_, Out, _, _], sink: Sink[Out]) ⇒
|
||||||
|
addPipeToSinkEdge(junctionOut, gf.inPipe, sink)
|
||||||
case x ⇒ throwUnsupportedValue(x)
|
case x ⇒ throwUnsupportedValue(x)
|
||||||
}
|
}
|
||||||
this
|
this
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue