diff --git a/akka-docs-dev/rst/scala/code/docs/stream/CompositionDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/CompositionDocSpec.scala index 61ae6fedeb..3f58420996 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/CompositionDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/CompositionDocSpec.scala @@ -78,13 +78,13 @@ class CompositionDocSpec extends AkkaSpec { //#complex-graph import FlowGraph.Implicits._ RunnableGraph.fromGraph(FlowGraph.create() { implicit builder => - val A: Outlet[Int] = builder.add(Source.single(0)) + val A: Outlet[Int] = builder.add(Source.single(0)).outlet val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2)) val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1)) val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2)) val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) - val G: Inlet[Any] = builder.add(Sink.foreach(println)) + val G: Inlet[Any] = builder.add(Sink.foreach(println)).inlet C <~ F A ~> B ~> C ~> F diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala index 468465bfd1..49b4024b93 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/MessageSpec.scala @@ -819,8 +819,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec { val messageHandler: Flow[Message, Message, Unit] = Flow.fromGraph { FlowGraph.create() { implicit b ⇒ - val in = b.add(Sink(messageIn)) - val out = b.add(Source(messageOut)) + val in = b.add(Sink(messageIn)).inlet + val out = b.add(Source(messageOut)).outlet FlowShape[Message, Message](in, out) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index b41196843d..302d9d1b86 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -103,7 +103,7 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { sink ⇒ val flow = b.add(Flow[Long].grouped(10)) flow ~> sink - FlowShape(flow.inlet, b.add(Source.single(ByteString("10")))) + FlowShape(flow.inlet, b.add(Source.single(ByteString("10"))).outlet) }) val ((l, m), r) = left.joinMat(bidiMat)(Keep.both).joinMat(right)(Keep.both).run() Await.result(l, 1.second) should ===(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala index 276236701f..75da9d9253 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala @@ -44,7 +44,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "not work from Outlets" in { RunnableGraph.fromGraph(FlowGraph.create() { implicit b ⇒ - val o: Outlet[Int] = b.add(source) + val o: Outlet[Int] = b.add(source).outlet "o <~ source" shouldNot compile sink <~ o ClosedShape @@ -97,7 +97,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work towards Outlets" in { Await.result(RunnableGraph.fromGraph(FlowGraph.create(sink) { implicit b ⇒ s ⇒ - val o: Outlet[Int] = b.add(source) + val o: Outlet[Int] = b.add(source).outlet s <~ o ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 21459c2347..2f557e88d4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -570,15 +570,9 @@ object FlowGraph extends GraphApply { class Builder[+M] private[stream] () { private var moduleInProgress: Module = EmptyModule - private[FlowGraph] def addEdge[A1, A >: A1, B, B1 >: B, M2](from: Outlet[A1], via: Graph[FlowShape[A, B], M2], to: Inlet[B1]): Unit = { - val flowCopy = via.module.carbonCopy - moduleInProgress = - moduleInProgress - .compose(flowCopy) - .wire(from, flowCopy.shape.inlets.head) - .wire(flowCopy.shape.outlets.head, to) - } - + /** + * INTERNAL API + */ private[FlowGraph] def addEdge[T, U >: T](from: Outlet[T], to: Inlet[U]): Unit = moduleInProgress = moduleInProgress.wire(from, to) @@ -620,9 +614,6 @@ object FlowGraph extends GraphApply { graph.shape.copyFromPorts(copy.shape.inlets, copy.shape.outlets).asInstanceOf[S] } - def add[T](s: Source[T, _]): Outlet[T] = add(s: Graph[SourceShape[T], _]).outlet - def add[T](s: Sink[T, _]): Inlet[T] = add(s: Graph[SinkShape[T], _]).inlet - /** * Returns an [[Outlet]] that gives access to the materialized value of this graph. Once the graph is materialized * this outlet will emit exactly one element which is the materialized value. It is possible to expose this