diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/MaterializationBenchmark.scala index 01a6f80e4f..79f6f6f893 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -42,7 +42,7 @@ object MaterializationBenchmark { for (_ <- 1 to numOfNestedGraphs) { flow = GraphDSL.create(flow) { b ⇒ flow ⇒ - FlowShape(flow.inlet, flow.outlet) + FlowShape(flow.in, flow.out) } } @@ -58,13 +58,13 @@ object MaterializationBenchmark { RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b ⇒ source ⇒ import GraphDSL.Implicits._ val flow = Flow[Unit].map(identity) - var outlet: Outlet[Unit] = source.outlet + var out: Outlet[Unit] = source.out for (i <- 0 until numOfFlows) { val flowShape = b.add(flow) - outlet ~> flowShape - outlet = flowShape.outlet + out ~> flowShape + out = flowShape.outlet } - outlet ~> Sink.ignore + out ~> Sink.ignore ClosedShape }) } diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index 350d1a6d7a..b68e833a3f 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -45,7 +45,6 @@ Update procedure ``BidiFlow.fromFlows`` or ``BidiFlow.fromFlowsMat`` 5. Replace all uses of ``BidiFlow.apply()`` (Scala DSL) or ``BidiFlow.create()`` (Java DSL) when it converts two functions to a ``BidiFlow`` with ``BidiFlow.fromFunctions`` - Example ^^^^^^^ @@ -86,6 +85,20 @@ Should be replaced by .. includecode:: code/docs/MigrationsJava.java#bidi-wrap + +Renamed ``inlet()`` and ``outlet()`` to ``in()`` and ``out()`` in ``SourceShape``, ``SinkShape`` and ``FlowShape`` +========================================================================================================== + +The input and output ports of these shapes where called ``inlet()`` and ``outlet()`` compared to other shapes that +consistently used ``in()`` and ``out()``. Now all :class:`Shape` s use ``in()`` and ``out()``. + +Update procedure +---------------- + +Change all references to ``inlet()`` to ``in()`` and all references to ``outlet()`` to ``out()`` when referring to the ports +of :class:`FlowShape`, :class:`SourceShape` and :class:`SinkShape`. + + FlowGraph class and builder methods have been renamed ===================================================== diff --git a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala index 500d6af14c..0fd0baf3fa 100644 --- a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala +++ b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala @@ -181,7 +181,7 @@ class MigrationsScala extends AkkaSpec { // absorbTermination turns into the code below. // This emulates the behavior of the AsyncStage stage. private def absorbTermination(): Unit = - if (isAvailable(shape.outlet)) getHandler(out).onPull() + if (isAvailable(shape.out)) getHandler(out).onPull() // The line below emulates the behavior of the AsyncStage holdingDownstream private def holdingDownstream(): Boolean = diff --git a/akka-docs-dev/rst/scala/code/docs/stream/CompositionDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/CompositionDocSpec.scala index b7439c0f74..54fe9bde7f 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/CompositionDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/CompositionDocSpec.scala @@ -78,13 +78,13 @@ class CompositionDocSpec extends AkkaSpec { //#complex-graph import GraphDSL.Implicits._ RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => - val A: Outlet[Int] = builder.add(Source.single(0)).outlet + val A: Outlet[Int] = builder.add(Source.single(0)).out val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2)) val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1)) val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2)) val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) - val G: Inlet[Any] = builder.add(Sink.foreach(println)).inlet + val G: Inlet[Any] = builder.add(Sink.foreach(println)).in C <~ F A ~> B ~> C ~> F diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 8beaa33d78..e56ab5a3bf 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -80,8 +80,8 @@ class FlowGraphDocSpec extends AkkaSpec { val broadcast = builder.add(Broadcast[Int](2)) Source.single(1) ~> broadcast.in - broadcast.out(0) ~> sharedDoubler ~> topHS.inlet - broadcast.out(1) ~> sharedDoubler ~> bottomHS.inlet + broadcast.out(0) ~> sharedDoubler ~> topHS.in + broadcast.out(1) ~> sharedDoubler ~> bottomHS.in ClosedShape }) //#flow-graph-reusing-a-flow @@ -207,7 +207,7 @@ class FlowGraphDocSpec extends AkkaSpec { val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder ⇒ fold ⇒ - FlowShape(fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet) + FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet) }) //#flow-graph-matvalue diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index ac004d4b16..9851f27d15 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -41,7 +41,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { Source.single(1) ~> pm3.in(0) Source.single(2) ~> pm3.in(1) Source.single(3) ~> pm3.in(2) - pm3.out ~> sink.inlet + pm3.out ~> sink.in ClosedShape }) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala index 087d315da7..4502237c7e 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala @@ -108,7 +108,7 @@ class StreamTcpDocSpec extends AkkaSpec { // then we continue using the echo-logic Flow echo.outlet ~> concat.in(1) - FlowShape(echo.inlet, concat.out) + FlowShape(echo.in, concat.out) }) connection.handleWith(serverLogic) diff --git a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst index a6e62c6726..bf6d15b48a 100644 --- a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst @@ -368,6 +368,18 @@ Update procedure 1. All custom shapes must use ``@uncheckedVariance`` on their ``Inlet`` and ``Outlet`` members. +Renamed ``inlet()`` and ``outlet()`` to ``in()`` and ``out()`` in ``SourceShape``, ``SinkShape`` and ``FlowShape`` +========================================================================================================== + +The input and output ports of these shapes where called ``inlet()`` and ``outlet()`` compared to other shapes that +consistently used ``in()`` and ``out()``. Now all :class:`Shape` s use ``in()`` and ``out()``. + +Update procedure +---------------- + +Change all references to ``inlet()`` to ``in()`` and all references to ``outlet()`` to ``out()`` when referring to the ports +of :class:`FlowShape`, :class:`SourceShape` and :class:`SinkShape`. + Semantic change in ``isHoldingUpstream`` in the DetachedStage DSL ================================================================= @@ -557,7 +569,7 @@ should be replaced by .. includecode:: code/docs/MigrationsScala.scala#file-source-sink InputStreamSource and OutputStreamSink -============================================ +====================================== Both have been replaced by ``Source.inputStream(…)`` and ``Sink.outputStream(…)`` due to discoverability issues. diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index a7fbcca813..8f3421c6aa 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -100,8 +100,8 @@ private[http] object OutgoingConnectionBlueprint { BidiShape( methodBypassFanout.in, - wrapTls.outlet, - unwrapTls.inlet, + wrapTls.out, + unwrapTls.in, terminationFanout.out(1)) }) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index 43850e0224..1e6ee2ce25 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -71,7 +71,7 @@ private object PoolSlot { slotProcessor ~> split.in - new FanOutShape2(slotProcessor.inlet, + new FanOutShape2(slotProcessor.in, split.out(0).collect { case ResponseDelivery(r) ⇒ r }.outlet, split.out(1).collect { case r: RawSlotEvent ⇒ r }.outlet) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala index a81d914f75..5c6c0ee698 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/rendering/RenderSupport.scala @@ -36,7 +36,7 @@ private object RenderSupport { frst ⇒ import GraphDSL.Implicits._ second ~> Sink.cancelled - SourceShape(frst.outlet) + SourceShape(frst.out) }) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala index 09ccbd8d52..21ef789361 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Websocket.scala @@ -146,8 +146,8 @@ private[http] object Websocket { BidiShape( split.in, - messagePreparation.outlet, - messageRendering.inlet, + messagePreparation.out, + messageRendering.in, merge.out) }.named("ws-message-api")) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala index 428af5d0b4..d14f627c0b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala @@ -122,9 +122,9 @@ object WebsocketClientBlueprint { wsIn.outlet ~> httpRequestBytesAndThenWSBytes BidiShape( - networkIn.inlet, - networkIn.outlet, - wsIn.inlet, + networkIn.in, + networkIn.out, + wsIn.in, httpRequestBytesAndThenWSBytes.out) }) mapMaterializedValue (_ ⇒ result.future) } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index bb4973a437..3b15ce7b36 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -55,7 +55,7 @@ public class BidiFlowTest extends StreamTest { } })); return new BidiShape(top - .inlet(), top.outlet(), bottom.inlet(), bottom.outlet()); + .in(), top.out(), bottom.in(), bottom.out()); } })); @@ -81,7 +81,7 @@ public class BidiFlowTest extends StreamTest { } })); return new BidiShape(top - .inlet(), top.outlet(), bottom.inlet(), bottom.outlet()); + .in(), top.out(), bottom.in(), bottom.out()); } })); @@ -109,7 +109,7 @@ public class BidiFlowTest extends StreamTest { } })); return new BidiShape(top - .inlet(), top.outlet(), bottom.inlet(), bottom.outlet()); + .in(), top.out(), bottom.in(), bottom.out()); } })); @@ -248,7 +248,7 @@ public class BidiFlowTest extends StreamTest { b.from(bcast).to(sink) .from(b.add(Source.single(1))).viaFanOut(bcast).toFanIn(merge) .from(flow).toFanIn(merge); - return new FlowShape(flow.inlet(), merge.out()); + return new FlowShape(flow.in(), merge.out()); } })); final Flow>> right = Flow.fromGraph(GraphDSL.create( @@ -258,7 +258,7 @@ public class BidiFlowTest extends StreamTest { SinkShape> sink) throws Exception { final FlowShape> flow = b.add(Flow.of(Long.class).grouped(10)); b.from(flow).to(sink); - return new FlowShape(flow.inlet(), b.add(Source.single(ByteString.fromString("10"))).outlet()); + return new FlowShape(flow.in(), b.add(Source.single(ByteString.fromString("10"))).out()); } })); final Pair, Future>, Future>> result = diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 927af7219a..b4b00096ba 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -364,8 +364,8 @@ public class FlowTest extends StreamTest { RunnableGraph.fromGraph(GraphDSL.create(new Function, ClosedShape>(){ public ClosedShape apply(Builder b) { - final Outlet in1 = b.add(Source.from(input1)).outlet(); - final Outlet in2 = b.add(Source.from(input2)).outlet(); + final Outlet in1 = b.add(Source.from(input1)).out(); + final Outlet in2 = b.add(Source.from(input2)).out(); final FanInShape2> zip = b.add(Zip.create()); final SinkShape> out = b.add(Sink.foreach(new Procedure>() { diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index bb7461b48a..73b7b9392d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -358,7 +358,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic val bcast = b.add(Broadcast[String](2)) source1 ~> merge.in(0) - source2.outlet ~> merge.in(1) + source2.out ~> merge.in(1) merge.out.map(_.toString) ~> bcast.in diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index f875e36f40..61870f2e95 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -72,8 +72,8 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { val sink = new DownstreamProbe[Int]("sink") builder(detach) - .connect(source, detach.shape.inlet) - .connect(detach.shape.outlet, sink) + .connect(source, detach.shape.in) + .connect(detach.shape.out, sink) .init() lastEvents() should ===(Set.empty) @@ -313,8 +313,8 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { .connect(source, merge.in(0)) .connect(merge.out, balance.in) .connect(balance.out(0), sink) - .connect(balance.out(1), detach.shape.inlet) - .connect(detach.shape.outlet, merge.in(1)) + .connect(balance.out(1), detach.shape.in) + .connect(detach.shape.out, merge.in(1)) .init() lastEvents() should ===(Set.empty) @@ -351,8 +351,8 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { Attributes.none) builder(buffer) - .connect(source, buffer.shape.inlet) - .connect(buffer.shape.outlet, sink) + .connect(source, buffer.shape.in) + .connect(buffer.shape.out, sink) .init() stepAll() diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index 478d515ee4..34bdb12304 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -339,9 +339,9 @@ trait GraphInterpreterSpecKit { while (i < ops.length) { val stage = ops(i).asInstanceOf[PushPullGraphStage[_, _, _]] - ins(i) = stage.shape.inlet + ins(i) = stage.shape.in inOwners(i) = i - outs(i + 1) = stage.shape.outlet + outs(i + 1) = stage.shape.out outOwners(i + 1) = i i += 1 } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/KeepGoingStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/KeepGoingStageSpec.scala index 32c9e2e375..e0be7ba52f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/KeepGoingStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/KeepGoingStageSpec.scala @@ -66,8 +66,8 @@ class KeepGoingStageSpec extends AkkaSpec { } finally listener.foreach(_ ! EndOfEventHandler) } - setHandler(shape.inlet, new InHandler { - override def onPush(): Unit = pull(shape.inlet) + setHandler(shape.in, new InHandler { + override def onPush(): Unit = pull(shape.in) // Ignore finish override def onUpstreamFinish(): Unit = listener.foreach(_ ! UpstreamCompleted) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index 8d960f3bf8..b05d906a0f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -32,7 +32,7 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { val top = b.add(Flow[Int].map(x ⇒ x.toLong + 2)) val bottom = b.add(Flow[ByteString].map(_.decodeString("UTF-8"))) - BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + BidiShape(top.in, top.out, bottom.in, bottom.out) }) val str = "Hello World" @@ -97,13 +97,13 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { bcast ~> sink Source.single(1) ~> bcast ~> merge flow ~> merge - FlowShape(flow.inlet, merge.out) + FlowShape(flow.in, merge.out) }) val right = Flow.fromGraph(GraphDSL.create(Sink.head[immutable.Seq[Long]]) { implicit b ⇒ sink ⇒ val flow = b.add(Flow[Long].grouped(10)) flow ~> sink - FlowShape(flow.inlet, b.add(Source.single(ByteString("10"))).outlet) + FlowShape(flow.in, b.add(Source.single(ByteString("10"))).out) }) val ((l, m), r) = left.joinMat(bidiMat)(Keep.both).joinMat(right)(Keep.both).run() Await.result(l, 1.second) should ===(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index ad3d3d7c5b..43a049d01e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -69,9 +69,9 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val assembly = new GraphAssembly( Array(stage), Array(Attributes.none), - Array(stage.shape.inlet, null), + Array(stage.shape.in, null), Array(0, -1), - Array(null, stage.shape.outlet), + Array(null, stage.shape.out), Array(-1, 0)) val (inHandlers, outHandlers, logics) = diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala index 8d31da600f..c6a92aefa7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala @@ -65,7 +65,7 @@ class GraphFlowSpec extends AkkaSpec { val flow = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ partial ⇒ import GraphDSL.Implicits._ - FlowShape(partial.inlet, partial.outlet.map(_.toInt).outlet) + FlowShape(partial.in, partial.out.map(_.toInt).outlet) }) source1.via(flow).to(Sink(probe)).run() @@ -77,7 +77,7 @@ class GraphFlowSpec extends AkkaSpec { val probe = TestSubscriber.manualProbe[Int]() val flow = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ - partial ⇒ FlowShape(partial.inlet, partial.outlet) + partial ⇒ FlowShape(partial.in, partial.out) }) source1.via(flow).map(_.toInt).to(Sink(probe)).run() @@ -90,12 +90,12 @@ class GraphFlowSpec extends AkkaSpec { val flow1 = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ partial ⇒ - FlowShape(partial.inlet, partial.outlet) + FlowShape(partial.in, partial.out) }) val flow2 = Flow.fromGraph(GraphDSL.create(Flow[String].map(_.toInt)) { implicit b ⇒ importFlow ⇒ - FlowShape(importFlow.inlet, importFlow.outlet) + FlowShape(importFlow.in, importFlow.out) }) source1.via(flow1).via(flow2).to(Sink(probe)).run() @@ -107,7 +107,7 @@ class GraphFlowSpec extends AkkaSpec { val probe = TestSubscriber.manualProbe[Int]() val flow = Flow.fromGraph(GraphDSL.create(Flow[Int].map(_ * 2)) { implicit b ⇒ - importFlow ⇒ FlowShape(importFlow.inlet, importFlow.outlet) + importFlow ⇒ FlowShape(importFlow.in, importFlow.out) }) RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ @@ -127,8 +127,8 @@ class GraphFlowSpec extends AkkaSpec { val source = Source.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ partial ⇒ import GraphDSL.Implicits._ - source1 ~> partial.inlet - SourceShape(partial.outlet.map(_.toInt).outlet) + source1 ~> partial.in + SourceShape(partial.out.map(_.toInt).outlet) }) source.to(Sink(probe)).run() @@ -152,8 +152,8 @@ class GraphFlowSpec extends AkkaSpec { val source = Source.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ partial ⇒ import GraphDSL.Implicits._ - source1 ~> partial.inlet - SourceShape(partial.outlet) + source1 ~> partial.in + SourceShape(partial.out) }) source.map(_.toInt).to(Sink(probe)).run() @@ -167,13 +167,13 @@ class GraphFlowSpec extends AkkaSpec { val source = Source.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ partial ⇒ import GraphDSL.Implicits._ - source1 ~> partial.inlet - SourceShape(partial.outlet) + source1 ~> partial.in + SourceShape(partial.out) }) val flow = Flow.fromGraph(GraphDSL.create(Flow[String].map(_.toInt)) { implicit b ⇒ importFlow ⇒ - FlowShape(importFlow.inlet, importFlow.outlet) + FlowShape(importFlow.in, importFlow.out) }) source.via(flow).to(Sink(probe)).run() @@ -187,16 +187,16 @@ class GraphFlowSpec extends AkkaSpec { val source = Source.fromGraph(GraphDSL.create(Source(1 to 5)) { implicit b ⇒ s ⇒ import GraphDSL.Implicits._ - SourceShape(s.outlet.map(_ * 2).outlet) + SourceShape(s.out.map(_ * 2).outlet) }) RunnableGraph.fromGraph(GraphDSL.create(source, source)(Keep.both) { implicit b ⇒ (s1, s2) ⇒ import GraphDSL.Implicits._ val merge = b.add(Merge[Int](2)) - s1.outlet ~> merge.in(0) + s1.out ~> merge.in(0) merge.out ~> Sink(probe) - s2.outlet.map(_ * 10) ~> merge.in(1) + s2.out.map(_ * 10) ~> merge.in(1) ClosedShape }).run() @@ -211,8 +211,8 @@ class GraphFlowSpec extends AkkaSpec { val sink = Sink.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ partial ⇒ import GraphDSL.Implicits._ - partial.outlet.map(_.toInt) ~> Sink(probe) - SinkShape(partial.inlet) + partial.out.map(_.toInt) ~> Sink(probe) + SinkShape(partial.in) }) source1.to(sink).run() @@ -225,7 +225,7 @@ class GraphFlowSpec extends AkkaSpec { val pubSink = Sink.publisher[Int](false) val sink = Sink.fromGraph(GraphDSL.create(pubSink) { implicit b ⇒ - p ⇒ SinkShape(p.inlet) + p ⇒ SinkShape(p.in) }) val mm = source1.runWith(sink) @@ -240,9 +240,9 @@ class GraphFlowSpec extends AkkaSpec { val sink = Sink.fromGraph(GraphDSL.create(partialGraph, Flow[String].map(_.toInt))(Keep.both) { implicit b ⇒ (partial, flow) ⇒ import GraphDSL.Implicits._ - flow.outlet ~> partial.inlet - partial.outlet.map(_.toInt) ~> Sink(probe) - SinkShape(flow.inlet) + flow.out ~> partial.in + partial.out.map(_.toInt) ~> Sink(probe) + SinkShape(flow.in) }) val iSink = Flow[Int].map(_.toString).to(sink) @@ -257,14 +257,14 @@ class GraphFlowSpec extends AkkaSpec { val flow = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ partial ⇒ - FlowShape(partial.inlet, partial.outlet) + FlowShape(partial.in, partial.out) }) val sink = Sink.fromGraph(GraphDSL.create(Flow[String].map(_.toInt)) { implicit b ⇒ flow ⇒ import GraphDSL.Implicits._ - flow.outlet ~> Sink(probe) - SinkShape(flow.inlet) + flow.out ~> Sink(probe) + SinkShape(flow.in) }) source1.via(flow).to(sink).run() @@ -282,28 +282,28 @@ class GraphFlowSpec extends AkkaSpec { val flow = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒ partial ⇒ import GraphDSL.Implicits._ - FlowShape(partial.inlet, partial.outlet.map(_.toInt).outlet) + FlowShape(partial.in, partial.out.map(_.toInt).outlet) }) val source = Source.fromGraph(GraphDSL.create(Flow[Int].map(_.toString), inSource)(Keep.right) { implicit b ⇒ (flow, src) ⇒ import GraphDSL.Implicits._ - src.outlet ~> flow.inlet - SourceShape(flow.outlet) + src.out ~> flow.in + SourceShape(flow.out) }) val sink = Sink.fromGraph(GraphDSL.create(Flow[String].map(_.toInt), outSink)(Keep.right) { implicit b ⇒ (flow, snk) ⇒ import GraphDSL.Implicits._ - flow.outlet ~> snk.inlet - SinkShape(flow.inlet) + flow.out ~> snk.in + SinkShape(flow.in) }) val (m1, m2, m3) = RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)(Tuple3.apply) { implicit b ⇒ (src, f, snk) ⇒ import GraphDSL.Implicits._ - src.outlet.map(_.toInt) ~> f.inlet - f.outlet.map(_.toString) ~> snk.inlet + src.out.map(_.toInt) ~> f.in + f.out.map(_.toString) ~> snk.in ClosedShape }).run() @@ -322,18 +322,18 @@ class GraphFlowSpec extends AkkaSpec { val source = Source.fromGraph(GraphDSL.create(inSource) { implicit b ⇒ src ⇒ - SourceShape(src.outlet) + SourceShape(src.out) }) val sink = Sink.fromGraph(GraphDSL.create(outSink) { implicit b ⇒ snk ⇒ - SinkShape(snk.inlet) + SinkShape(snk.in) }) val (m1, m2) = RunnableGraph.fromGraph(GraphDSL.create(source, sink)(Keep.both) { implicit b ⇒ (src, snk) ⇒ import GraphDSL.Implicits._ - src.outlet ~> snk.inlet + src.out ~> snk.in ClosedShape }).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala index f70c77de49..c94ed1c69b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala @@ -63,11 +63,11 @@ class GraphBroadcastSpec extends AkkaSpec { (p1, p2, p3, p4, p5) ⇒ val bcast = b.add(Broadcast[Int](5)) Source(List(1, 2, 3)) ~> bcast.in - bcast.out(0).grouped(5) ~> p1.inlet - bcast.out(1).grouped(5) ~> p2.inlet - bcast.out(2).grouped(5) ~> p3.inlet - bcast.out(3).grouped(5) ~> p4.inlet - bcast.out(4).grouped(5) ~> p5.inlet + bcast.out(0).grouped(5) ~> p1.in + bcast.out(1).grouped(5) ~> p2.in + bcast.out(2).grouped(5) ~> p3.in + bcast.out(3).grouped(5) ~> p4.in + bcast.out(4).grouped(5) ~> p5.in ClosedShape }).run() @@ -94,28 +94,28 @@ class GraphBroadcastSpec extends AkkaSpec { (p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16, p17, p18, p19, p20, p21, p22) ⇒ val bcast = b.add(Broadcast[Int](22)) Source(List(1, 2, 3)) ~> bcast.in - bcast.out(0).grouped(5) ~> p1.inlet - bcast.out(1).grouped(5) ~> p2.inlet - bcast.out(2).grouped(5) ~> p3.inlet - bcast.out(3).grouped(5) ~> p4.inlet - bcast.out(4).grouped(5) ~> p5.inlet - bcast.out(5).grouped(5) ~> p6.inlet - bcast.out(6).grouped(5) ~> p7.inlet - bcast.out(7).grouped(5) ~> p8.inlet - bcast.out(8).grouped(5) ~> p9.inlet - bcast.out(9).grouped(5) ~> p10.inlet - bcast.out(10).grouped(5) ~> p11.inlet - bcast.out(11).grouped(5) ~> p12.inlet - bcast.out(12).grouped(5) ~> p13.inlet - bcast.out(13).grouped(5) ~> p14.inlet - bcast.out(14).grouped(5) ~> p15.inlet - bcast.out(15).grouped(5) ~> p16.inlet - bcast.out(16).grouped(5) ~> p17.inlet - bcast.out(17).grouped(5) ~> p18.inlet - bcast.out(18).grouped(5) ~> p19.inlet - bcast.out(19).grouped(5) ~> p20.inlet - bcast.out(20).grouped(5) ~> p21.inlet - bcast.out(21).grouped(5) ~> p22.inlet + bcast.out(0).grouped(5) ~> p1.in + bcast.out(1).grouped(5) ~> p2.in + bcast.out(2).grouped(5) ~> p3.in + bcast.out(3).grouped(5) ~> p4.in + bcast.out(4).grouped(5) ~> p5.in + bcast.out(5).grouped(5) ~> p6.in + bcast.out(6).grouped(5) ~> p7.in + bcast.out(7).grouped(5) ~> p8.in + bcast.out(8).grouped(5) ~> p9.in + bcast.out(9).grouped(5) ~> p10.in + bcast.out(10).grouped(5) ~> p11.in + bcast.out(11).grouped(5) ~> p12.in + bcast.out(12).grouped(5) ~> p13.in + bcast.out(13).grouped(5) ~> p14.in + bcast.out(14).grouped(5) ~> p15.in + bcast.out(15).grouped(5) ~> p16.in + bcast.out(16).grouped(5) ~> p17.in + bcast.out(17).grouped(5) ~> p18.in + bcast.out(18).grouped(5) ~> p19.in + bcast.out(19).grouped(5) ~> p20.in + bcast.out(20).grouped(5) ~> p21.in + bcast.out(21).grouped(5) ~> p22.in ClosedShape }).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala index 700f585122..07bc643d7c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -83,16 +83,16 @@ class GraphMatValueSpec extends AkkaSpec { (s1, s2) ⇒ val zip = b.add(ZipWith[Int, Int, Int](_ + _)) - s1.outlet.mapAsync(4)(identity) ~> zip.in0 - s2.outlet.mapAsync(4)(identity).map(_ * 100) ~> zip.in1 + s1.out.mapAsync(4)(identity) ~> zip.in0 + s2.out.mapAsync(4)(identity).map(_ * 100) ~> zip.in1 SourceShape(zip.out) }) val compositeSource2 = Source.fromGraph(GraphDSL.create(compositeSource1, compositeSource1)(Keep.both) { implicit b ⇒ (s1, s2) ⇒ val zip = b.add(ZipWith[Int, Int, Int](_ + _)) - s1.outlet ~> zip.in0 - s2.outlet.map(_ * 10000) ~> zip.in1 + s1.out ~> zip.in0 + s2.out.map(_ * 10000) ~> zip.in1 SourceShape(zip.out) }) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePreferredSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePreferredSpec.scala index 8b0685adfc..3fd9e83557 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePreferredSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePreferredSpec.scala @@ -37,7 +37,7 @@ class GraphMergePreferredSpec extends TwoStreamsSetup { val merge = b.add(MergePreferred[Int](3)) preferred ~> merge.preferred - merge.out.grouped(numElements * 2) ~> sink.inlet + merge.out.grouped(numElements * 2) ~> sink.in aux ~> merge.in(0) aux ~> merge.in(1) aux ~> merge.in(2) @@ -53,7 +53,7 @@ class GraphMergePreferredSpec extends TwoStreamsSetup { val merge = b.add(MergePreferred[Int](3)) Source(1 to 100) ~> merge.preferred - merge.out.grouped(500) ~> sink.inlet + merge.out.grouped(500) ~> sink.in Source(101 to 200) ~> merge.in(0) Source(201 to 300) ~> merge.in(1) Source(301 to 400) ~> merge.in(2) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala index b2cee7eb71..7e28a490ae 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala @@ -157,8 +157,8 @@ class GraphMergeSpec extends TwoStreamsSetup { val (graphSubscriber1, graphSubscriber2) = RunnableGraph.fromGraph(GraphDSL.create(src1, src2)((_, _)) { implicit b ⇒ (s1, s2) ⇒ val merge = b.add(Merge[Int](2)) - s1.outlet ~> merge.in(0) - s2.outlet ~> merge.in(1) + s1.out ~> merge.in(0) + s2.out ~> merge.in(1) merge.out ~> Sink(down) ClosedShape }).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index 64111b714a..ab46d81982 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -62,7 +62,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual Source(List(1, 2, 3)) ~> bcast.in bcast.out(0) ~> merge.in(0) bcast.out(1).map(_ + 3) ~> merge.in(1) - merge.out.grouped(10) ~> sink.inlet + merge.out.grouped(10) ~> sink.in ClosedShape }).run() @@ -80,7 +80,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual for (i ← 0 until 5) balance.out(i) ~> merge.in(i) - merge.out.grouped(elements.size * 2) ~> sink.inlet + merge.out.grouped(elements.size * 2) ~> sink.in ClosedShape }).run() @@ -118,15 +118,15 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual // Second layer m11.out ~> b11.in - b11.out(0).grouped(1000) ~> sink2.inlet // Vertex 2 is omitted since it has only one in and out + b11.out(0).grouped(1000) ~> sink2.in // Vertex 2 is omitted since it has only one in and out b11.out(1) ~> m9.in(0) b11.out(2) ~> m10.in(1) m8.out ~> m9.in(1) // Third layer - m9.out.grouped(1000) ~> sink9.inlet - m10.out.grouped(1000) ~> sink10.inlet + m9.out.grouped(1000) ~> sink9.in + m10.out.grouped(1000) ~> sink10.in ClosedShape }).run() @@ -147,7 +147,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual Source(List(1, 2, 3)).map(_ * 2) ~> bcast.in bcast.out(0) ~> merge.in(0) bcast.out(1).map(_ + 3) ~> merge.in(1) - merge.out.grouped(10) ~> sink.inlet + merge.out.grouped(10) ~> sink.in ClosedShape }).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartialSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartialSpec.scala index 5a0f96c267..162224b859 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartialSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartialSpec.scala @@ -29,9 +29,9 @@ class GraphPartialSpec extends AkkaSpec { val (_, _, result) = RunnableGraph.fromGraph(GraphDSL.create(doubler, doubler, Sink.head[Seq[Int]])(Tuple3.apply) { implicit b ⇒ (d1, d2, sink) ⇒ - Source(List(1, 2, 3)) ~> d1.inlet - d1.outlet ~> d2.inlet - d2.outlet.grouped(100) ~> sink.inlet + Source(List(1, 2, 3)) ~> d1.in + d1.out ~> d2.in + d2.out.grouped(100) ~> sink.in ClosedShape }).run() @@ -46,15 +46,15 @@ class GraphPartialSpec extends AkkaSpec { bcast.out(0) ~> zip.in0 bcast.out(1) ~> zip.in1 - bcast.out(2).grouped(100) ~> sink.inlet + bcast.out(2).grouped(100) ~> sink.in FlowShape(bcast.in, zip.out) } val (sub1, sub2, result) = RunnableGraph.fromGraph(GraphDSL.create(doubler, doubler, Sink.head[Seq[Int]])(Tuple3.apply) { implicit b ⇒ (d1, d2, sink) ⇒ - Source(List(1, 2, 3)) ~> d1.inlet - d1.outlet ~> d2.inlet - d2.outlet.grouped(100) ~> sink.inlet + Source(List(1, 2, 3)) ~> d1.in + d1.out ~> d2.in + d2.out.grouped(100) ~> sink.in ClosedShape }).run() @@ -74,19 +74,19 @@ class GraphPartialSpec extends AkkaSpec { bcast.out(0) ~> zip.in0 bcast.out(1) ~> zip.in1 - bcast.out(2) ~> s1.inlet + bcast.out(2) ~> s1.in zip.out ~> bcast2.in - bcast2.out(0) ~> s2.inlet + bcast2.out(0) ~> s2.in FlowShape(bcast.in, bcast2.out(1)) } val (sub1, sub2, result) = RunnableGraph.fromGraph(GraphDSL.create(doubler, doubler, Sink.head[Seq[Int]])(Tuple3.apply) { implicit b ⇒ (d1, d2, sink) ⇒ - Source(List(1, 2, 3)) ~> d1.inlet - d1.outlet ~> d2.inlet - d2.outlet.grouped(100) ~> sink.inlet + Source(List(1, 2, 3)) ~> d1.in + d1.out ~> d2.in + d2.out.grouped(100) ~> sink.in ClosedShape }).run() @@ -100,14 +100,14 @@ class GraphPartialSpec extends AkkaSpec { "be able to expose the ports of imported graphs" in { val p = GraphDSL.create(Flow[Int].map(_ + 1)) { implicit b ⇒ flow ⇒ - FlowShape(flow.inlet, flow.outlet) + FlowShape(flow.in, flow.out) } val fut = RunnableGraph.fromGraph(GraphDSL.create(Sink.head[Int], p)(Keep.left) { implicit b ⇒ (sink, flow) ⇒ import GraphDSL.Implicits._ - Source.single(0) ~> flow.inlet - flow.outlet ~> sink.inlet + Source.single(0) ~> flow.in + flow.out ~> sink.in ClosedShape }).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala index a257f14460..d413fd44b0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala @@ -26,8 +26,8 @@ class PublisherSinkSpec extends AkkaSpec { val bcast = b.add(Broadcast[Int](2)) Source(0 to 5) ~> bcast.in - bcast.out(0).map(_ * 2) ~> p1.inlet - bcast.out(1) ~> p2.inlet + bcast.out(0).map(_ * 2) ~> p1.in + bcast.out(1) ~> p2.in ClosedShape }).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala index 812144c85d..c5ba42bcd8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ReverseArrowSpec.scala @@ -18,7 +18,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work from Inlets" in { Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒ s ⇒ - s.inlet <~ source + s.in <~ source ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) } @@ -44,7 +44,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "not work from Outlets" in { RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ - val o: Outlet[Int] = b.add(source).outlet + val o: Outlet[Int] = b.add(source).out "o <~ source" shouldNot compile sink <~ o ClosedShape @@ -99,7 +99,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "work towards Outlets" in { Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒ s ⇒ - val o: Outlet[Int] = b.add(source).outlet + val o: Outlet[Int] = b.add(source).out s <~ o ClosedShape }).run(), 1.second) should ===(Seq(1, 2, 3)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index c59c85ef98..ce9ff20af6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -35,7 +35,7 @@ class SinkSpec extends AkkaSpec { val sink = Sink.fromGraph(GraphDSL.create(Sink(probes(0))) { implicit b ⇒ s0 ⇒ val bcast = b.add(Broadcast[Int](3)) - bcast.out(0) ~> Flow[Int].filter(_ == 0) ~> s0.inlet + bcast.out(0) ~> Flow[Int].filter(_ == 0) ~> s0.in for (i ← 1 to 2) bcast.out(i).filter(_ == i) ~> Sink(probes(i)) SinkShape(bcast.in) }) @@ -52,8 +52,8 @@ class SinkSpec extends AkkaSpec { val sink = Sink.fromGraph(GraphDSL.create(Sink(probes(0)), Sink(probes(1)))(List(_, _)) { implicit b ⇒ (s0, s1) ⇒ val bcast = b.add(Broadcast[Int](3)) - bcast.out(0).filter(_ == 0) ~> s0.inlet - bcast.out(1).filter(_ == 1) ~> s1.inlet + bcast.out(0).filter(_ == 0) ~> s0.in + bcast.out(1).filter(_ == 1) ~> s1.in bcast.out(2).filter(_ == 2) ~> Sink(probes(2)) SinkShape(bcast.in) }) @@ -70,9 +70,9 @@ class SinkSpec extends AkkaSpec { val sink = Sink.fromGraph(GraphDSL.create(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(List(_, _, _)) { implicit b ⇒ (s0, s1, s2) ⇒ val bcast = b.add(Broadcast[Int](3)) - bcast.out(0).filter(_ == 0) ~> s0.inlet - bcast.out(1).filter(_ == 1) ~> s1.inlet - bcast.out(2).filter(_ == 2) ~> s2.inlet + bcast.out(0).filter(_ == 0) ~> s0.in + bcast.out(1).filter(_ == 1) ~> s1.in + bcast.out(2).filter(_ == 2) ~> s2.in SinkShape(bcast.in) }) Source(List(0, 1, 2)).runWith(sink) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 2c330715c8..08f138727b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -141,11 +141,11 @@ class SourceSpec extends AkkaSpec { (i0, i1, i2, i3, i4) ⇒ import GraphDSL.Implicits._ val m = b.add(Merge[Int](5)) - i0.outlet ~> m.in(0) - i1.outlet ~> m.in(1) - i2.outlet ~> m.in(2) - i3.outlet ~> m.in(3) - i4.outlet ~> m.in(4) + i0.out ~> m.in(0) + i1.out ~> m.in(1) + i2.out ~> m.in(2) + i3.out ~> m.in(3) + i4.out ~> m.in(4) SourceShape(m.out) }).to(Sink(out)).run() diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index 910a770f48..f478505a62 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -232,11 +232,11 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se * A Source [[Shape]] has exactly one output and no inputs, it models a source * of data. */ -final case class SourceShape[+T](outlet: Outlet[T @uncheckedVariance]) extends Shape { +final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq - override val outlets: immutable.Seq[Outlet[_]] = List(outlet) + override val outlets: immutable.Seq[Outlet[_]] = List(out) - override def deepCopy(): SourceShape[T] = SourceShape(outlet.carbonCopy()) + override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit SourceShape") require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit SourceShape") @@ -254,11 +254,11 @@ object SourceShape { * outside like a pipe (but it can be a complex topology of streams within of * course). */ -final case class FlowShape[-I, +O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]) extends Shape { - override val inlets: immutable.Seq[Inlet[_]] = List(inlet) - override val outlets: immutable.Seq[Outlet[_]] = List(outlet) +final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape { + override val inlets: immutable.Seq[Inlet[_]] = List(in) + override val outlets: immutable.Seq[Outlet[_]] = List(out) - override def deepCopy(): FlowShape[I, O] = FlowShape(inlet.carbonCopy(), outlet.carbonCopy()) + override def deepCopy(): FlowShape[I, O] = FlowShape(in.carbonCopy(), out.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit FlowShape") require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit FlowShape") @@ -274,11 +274,11 @@ object FlowShape { /** * A Sink [[Shape]] has exactly one input and no outputs, it models a data sink. */ -final case class SinkShape[-T](inlet: Inlet[T @uncheckedVariance]) extends Shape { - override val inlets: immutable.Seq[Inlet[_]] = List(inlet) +final case class SinkShape[-T](in: Inlet[T @uncheckedVariance]) extends Shape { + override val inlets: immutable.Seq[Inlet[_]] = List(in) override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq - override def deepCopy(): SinkShape[T] = SinkShape(inlet.carbonCopy()) + override def deepCopy(): SinkShape[T] = SinkShape(in.carbonCopy()) override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = { require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit SinkShape") require(outlets.isEmpty, s"proposed outlets [${outlets.mkString(", ")}] do not fit SinkShape") @@ -315,7 +315,7 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1 @uncheckedVa /** * Java API for creating from a pair of unidirectional flows. */ - def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.in, top.out, bottom.in, bottom.out) override def deepCopy(): BidiShape[In1, Out1, In2, Out2] = BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy()) @@ -330,7 +330,7 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1 @uncheckedVa //#bidi-shape object BidiShape { def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] = - BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) + BidiShape(top.in, top.out, bottom.in, bottom.out) /** Java API */ def of[In1, Out1, In2, Out2](in1: Inlet[In1 @uncheckedVariance], diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 1f19fe22c0..c3f8e047a5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -98,11 +98,11 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, atomic match { case sink: SinkModule[_, _] ⇒ val (sub, mat) = sink.create(newMaterializationContext()) - assignPort(sink.shape.inlet, sub.asInstanceOf[Subscriber[Any]]) + assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]]) matVal.put(atomic, mat) case source: SourceModule[_, _] ⇒ val (pub, mat) = source.create(newMaterializationContext()) - assignPort(source.shape.outlet, pub.asInstanceOf[Publisher[Any]]) + assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]]) matVal.put(atomic, mat) // FIXME: Remove this, only stream-of-stream ops need it diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 4b13df795c..2180eb0523 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -31,7 +31,7 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out // This is okay since the only caller of this method is right below. protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat] - override def carbonCopy: Module = newInstance(SourceShape(shape.outlet.carbonCopy())) + override def carbonCopy: Module = newInstance(SourceShape(shape.out.carbonCopy())) override def subModules: Set[Module] = Set.empty @@ -40,7 +40,7 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out val thatN = attr.nameOrDefault(null) if ((thatN eq null) || thisN == thatN) shape - else shape.copy(outlet = Outlet(thatN + ".out")) + else shape.copy(out = Outlet(thatN + ".out")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 2c2d33b414..f66a41dd3c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -29,7 +29,7 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte // This is okay since we the only caller of this method is right below. protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat] - override def carbonCopy: Module = newInstance(SinkShape(shape.inlet.carbonCopy())) + override def carbonCopy: Module = newInstance(SinkShape(shape.in.carbonCopy())) override def subModules: Set[Module] = Set.empty @@ -38,7 +38,7 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte val thatN = attr.nameOrDefault(null) if ((thatN eq null) || thisN == thatN) shape - else shape.copy(inlet = Inlet(thatN + ".in")) + else shape.copy(in = Inlet(thatN + ".in")) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index b650a15045..fa94d17dd7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -129,9 +129,9 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S val op = opsIterator.next().asInstanceOf[Stage[Any, Any]] val stage = new PushPullGraphStage((_) ⇒ op, Attributes.none) stages(i) = stage - ins(i) = stage.shape.inlet + ins(i) = stage.shape.in inOwners(i) = i - outs(i + 1) = stage.shape.outlet + outs(i + 1) = stage.shape.out outOwners(i + 1) = i i += 1 } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 9928e267f8..c346720adb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -173,8 +173,8 @@ private[stream] object TcpConnectionStage { class TcpStreamLogic(val shape: FlowShape[ByteString, ByteString], val role: TcpRole) extends GraphStageLogic(shape) { implicit private var self: StageActorRef = _ - private def bytesIn = shape.inlet - private def bytesOut = shape.outlet + private def bytesIn = shape.in + private def bytesOut = shape.out private var connection: ActorRef = _ // No reading until role have been decided diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index f676ff255e..989b466559 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -307,14 +307,14 @@ object GraphDSL extends GraphCreate { def materializedValue: Outlet[Mat @uncheckedVariance] = delegate.materializedValue def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out) - def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet) - def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet) + def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.out) + def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.out) def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out) def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0)) def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in) - def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet) - def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet) + def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.in) + def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.in) def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0)) def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index a36b2478fb..8042526b55 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -170,7 +170,7 @@ object BidiFlow { flow1: Graph[FlowShape[I1, O1], M1], flow2: Graph[FlowShape[I2, O2], M2])(combine: (M1, M2) ⇒ M): BidiFlow[I1, O1, I2, O2, M] = fromGraph(GraphDSL.create(flow1, flow2)(combine) { - implicit b ⇒ (f1, f2) ⇒ BidiShape(f1.inlet, f1.outlet, f2.inlet, f2.outlet) + implicit b ⇒ (f1, f2) ⇒ BidiShape(f1.in, f1.out, f2.in, f2.out) }) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 827b76ba88..abe38b1c5e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -46,8 +46,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) val flowCopy = flow.module.carbonCopy new Flow( module - .fuse(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine) - .replaceShape(FlowShape(shape.inlet, flowCopy.shape.outlets.head))) + .fuse(flowCopy, shape.out, flowCopy.shape.inlets.head, combine) + .replaceShape(FlowShape(shape.in, flowCopy.shape.outlets.head))) } /** @@ -93,8 +93,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) val sinkCopy = sink.module.carbonCopy new Sink( module - .fuse(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine) - .replaceShape(SinkShape(shape.inlet))) + .fuse(sinkCopy, shape.out, sinkCopy.shape.inlets.head, combine) + .replaceShape(SinkShape(shape.in))) } } @@ -136,8 +136,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) RunnableGraph( module .compose(flowCopy, combine) - .wire(shape.outlet, flowCopy.shape.inlets.head) - .wire(flowCopy.shape.outlets.head, shape.inlet)) + .wire(shape.out, flowCopy.shape.inlets.head) + .wire(flowCopy.shape.outlets.head, shape.in)) } /** @@ -181,8 +181,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) val outs = copy.shape.outlets new Flow(module .compose(copy, combine) - .wire(shape.outlet, ins.head) - .wire(outs(1), shape.inlet) + .wire(shape.out, ins.head) + .wire(outs(1), shape.in) .replaceShape(FlowShape(ins(1), outs.head))) } @@ -191,14 +191,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) private[stream] override def deprecatedAndThen[U](op: StageModule): Repr[U] = { //No need to copy here, op is a fresh instance if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U]] - else new Flow(module.fuse(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort))) + else new Flow(module.fuse(op, shape.out, op.inPort).replaceShape(FlowShape(shape.in, op.outPort))) } // FIXME: Only exists to keep old stuff alive private[akka] def deprecatedAndThenMat[U, Mat2, O >: Out](processorFactory: () ⇒ (Processor[O, U], Mat2)): ReprMat[U, Mat2] = { val op = DirectProcessor(processorFactory.asInstanceOf[() ⇒ (Processor[Any, Any], Any)]) if (this.isIdentity) new Flow(op).asInstanceOf[ReprMat[U, Mat2]] - else new Flow[In, U, Mat2](module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort))) + else new Flow[In, U, Mat2](module.fuse(op, shape.out, op.inPort, Keep.right).replaceShape(FlowShape(shape.in, op.outPort))) } /** @@ -289,7 +289,7 @@ object Flow { * Helper to create `Flow` from a `Sink`and a `Source`. */ def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(f: (M1, M2) ⇒ M): Flow[I, O, M] = - fromGraph(GraphDSL.create(sink, source)(f) { implicit b ⇒ (in, out) ⇒ FlowShape(in.inlet, out.outlet) }) + fromGraph(GraphDSL.create(sink, source)(f) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) }) } object RunnableGraph { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index c20052b4a0..0caa93b1b8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -812,8 +812,8 @@ object GraphDSL extends GraphApply { def ~>[Out](via: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out] = { val s = b.add(via) - b.addEdge(importAndGetPort(b), s.inlet) - s.outlet + b.addEdge(importAndGetPort(b), s.in) + s.out } def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = { @@ -836,15 +836,15 @@ object GraphDSL extends GraphApply { } def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = { - b.addEdge(importAndGetPort(b), flow.inlet) - flow.outlet + b.addEdge(importAndGetPort(b), flow.in) + flow.out } def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit = - b.addEdge(importAndGetPort(b), b.add(to).inlet) + b.addEdge(importAndGetPort(b), b.add(to).in) def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit = - b.addEdge(importAndGetPort(b), to.inlet) + b.addEdge(importAndGetPort(b), to.in) } sealed trait ReverseCombinerBase[T] extends Any { @@ -855,8 +855,8 @@ object GraphDSL extends GraphApply { def <~[In](via: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In] = { val s = b.add(via) - b.addEdge(s.outlet, importAndGetPortReverse(b)) - s.inlet + b.addEdge(s.out, importAndGetPortReverse(b)) + s.in } def <~[In](junction: UniformFanOutShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = { @@ -879,15 +879,15 @@ object GraphDSL extends GraphApply { } def <~[In](flow: FlowShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = { - b.addEdge(flow.outlet, importAndGetPortReverse(b)) - flow.inlet + b.addEdge(flow.out, importAndGetPortReverse(b)) + flow.in } def <~(from: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit = - b.addEdge(b.add(from).outlet, importAndGetPortReverse(b)) + b.addEdge(b.add(from).out, importAndGetPortReverse(b)) def <~(from: SourceShape[T])(implicit b: Builder[_]): Unit = - b.addEdge(from.outlet, importAndGetPortReverse(b)) + b.addEdge(from.out, importAndGetPortReverse(b)) } // Although Mat is always Unit, it cannot be removed as a type parameter, otherwise the "override type" @@ -911,7 +911,7 @@ object GraphDSL extends GraphApply { override private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U] = { b.deprecatedAndThen(outlet, op) - new PortOpsImpl(op.shape.outlet.asInstanceOf[Outlet[U]], b) + new PortOpsImpl(op.shape.out.asInstanceOf[Outlet[U]], b) } def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed = @@ -944,33 +944,33 @@ object GraphDSL extends GraphApply { } implicit final class SinkArrow[T](val s: Graph[SinkShape[T], _]) extends AnyVal with ReverseCombinerBase[T] { - override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s).inlet + override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s).in } implicit final class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] { - override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = s.inlet + override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = s.in } implicit final class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] { - override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.inlet + override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.in def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { val shape = b.add(bidi) - b.addEdge(f.outlet, shape.in1) - b.addEdge(shape.out2, f.inlet) + b.addEdge(f.out, shape.in1) + b.addEdge(shape.out2, f.in) shape } def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { - b.addEdge(f.outlet, bidi.in1) - b.addEdge(bidi.out2, f.inlet) + b.addEdge(f.out, bidi.in1) + b.addEdge(bidi.out2, f.in) bidi } def <~>[M](flow: Graph[FlowShape[O, I], M])(implicit b: Builder[_]): Unit = { val shape = b.add(flow) - b.addEdge(shape.outlet, f.inlet) - b.addEdge(f.outlet, shape.inlet) + b.addEdge(shape.out, f.in) + b.addEdge(f.out, shape.in) } } @@ -978,23 +978,23 @@ object GraphDSL extends GraphApply { def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { val shape = b.add(bidi) val flow = b.add(f) - b.addEdge(flow.outlet, shape.in1) - b.addEdge(shape.out2, flow.inlet) + b.addEdge(flow.out, shape.in1) + b.addEdge(shape.out2, flow.in) shape } def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { val flow = b.add(f) - b.addEdge(flow.outlet, bidi.in1) - b.addEdge(bidi.out2, flow.inlet) + b.addEdge(flow.out, bidi.in1) + b.addEdge(bidi.out2, flow.in) bidi } def <~>[M2](flow: Graph[FlowShape[O, I], M2])(implicit b: Builder[_]): Unit = { val shape = b.add(flow) val ff = b.add(f) - b.addEdge(shape.outlet, ff.inlet) - b.addEdge(ff.outlet, shape.inlet) + b.addEdge(shape.out, ff.in) + b.addEdge(ff.out, shape.in) } } @@ -1013,14 +1013,14 @@ object GraphDSL extends GraphApply { } def <~>(flow: FlowShape[O1, I2])(implicit b: Builder[_]): Unit = { - b.addEdge(bidi.out1, flow.inlet) - b.addEdge(flow.outlet, bidi.in2) + b.addEdge(bidi.out1, flow.in) + b.addEdge(flow.out, bidi.in2) } def <~>[M](f: Graph[FlowShape[O1, I2], M])(implicit b: Builder[_]): Unit = { val flow = b.add(f) - b.addEdge(bidi.out1, flow.inlet) - b.addEdge(flow.outlet, bidi.in2) + b.addEdge(bidi.out1, flow.in) + b.addEdge(flow.out, bidi.in2) } } @@ -1033,14 +1033,14 @@ object GraphDSL extends GraphApply { new PortOpsImpl(findOut(b, j, 0), b) implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder[_]): PortOps[O] = - new PortOpsImpl(f.outlet, b) + new PortOpsImpl(f.out, b) implicit final class SourceArrow[T](val s: Graph[SourceShape[T], _]) extends AnyVal with CombinerBase[T] { - override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s).outlet + override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s).out } implicit final class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] { - override def importAndGetPort(b: Builder[_]): Outlet[T] = s.outlet + override def importAndGetPort(b: Builder[_]): Outlet[T] = s.out } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 8d0d943dc3..dd3e17d1b1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -48,7 +48,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) val flowCopy = flow.module.carbonCopy new Source( module - .fuse(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine) + .fuse(flowCopy, shape.out, flowCopy.shape.inlets.head, combine) .replaceShape(SourceShape(flowCopy.shape.outlets.head))) } } @@ -65,7 +65,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) */ def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = { val sinkCopy = sink.module.carbonCopy - RunnableGraph(module.fuse(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)) + RunnableGraph(module.fuse(sinkCopy, shape.out, sinkCopy.shape.inlets.head, combine)) } /** @@ -79,7 +79,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) // No need to copy here, op is a fresh instance new Source( module - .fuse(op, shape.outlet, op.inPort) + .fuse(op, shape.out, op.inPort) .replaceShape(SourceShape(op.outPort))) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 0de0e40974..bc589c86de 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -51,7 +51,7 @@ private[stream] object AbstractStage { // No need to refer to the handle in a private val val handler = new InHandler with OutHandler { override def onPush(): Unit = - try { currentStage.onPush(grab(shape.inlet), ctx) } catch { case NonFatal(ex) ⇒ onSupervision(ex) } + try { currentStage.onPush(grab(shape.in), ctx) } catch { case NonFatal(ex) ⇒ onSupervision(ex) } override def onPull(): Unit = currentStage.onPull(ctx) @@ -62,8 +62,8 @@ private[stream] object AbstractStage { override def onDownstreamFinish(): Unit = currentStage.onDownstreamFinish(ctx) } - setHandler(shape.inlet, handler) - setHandler(shape.outlet, handler) + setHandler(shape.in, handler) + setHandler(shape.out, handler) } private def onSupervision(ex: Throwable): Unit = { @@ -81,22 +81,22 @@ private[stream] object AbstractStage { } private def resetAfterSupervise(): Unit = { - val mustPull = currentStage.isDetached || isAvailable(shape.outlet) - if (!hasBeenPulled(shape.inlet) && mustPull) pull(shape.inlet) + val mustPull = currentStage.isDetached || isAvailable(shape.out) + if (!hasBeenPulled(shape.in) && mustPull) pull(shape.in) } override protected[stream] def beforePreStart(): Unit = { super.beforePreStart() - if (currentStage.isDetached) pull(shape.inlet) + if (currentStage.isDetached) pull(shape.in) } final override def push(elem: Out): DownstreamDirective = { - push(shape.outlet, elem) + push(shape.out, elem) null } final override def pull(): UpstreamDirective = { - pull(shape.inlet) + pull(shape.in) null } @@ -106,7 +106,7 @@ private[stream] object AbstractStage { } final override def pushAndFinish(elem: Out): DownstreamDirective = { - push(shape.outlet, elem) + push(shape.out, elem) completeStage() null } @@ -116,10 +116,10 @@ private[stream] object AbstractStage { null } - final override def isFinishing: Boolean = isClosed(shape.inlet) + final override def isFinishing: Boolean = isClosed(shape.in) final override def absorbTermination(): TerminationDirective = { - if (isClosed(shape.outlet)) { + if (isClosed(shape.out)) { val ex = new UnsupportedOperationException("It is not allowed to call absorbTermination() from onDownstreamFinish.") // This MUST be logged here, since the downstream has cancelled, i.e. there is noone to send onError to, the // stage is just about to finish so noone will catch it anyway just the interpreter @@ -127,29 +127,29 @@ private[stream] object AbstractStage { interpreter.log.error(ex.getMessage) throw ex // We still throw for correctness (although a finish() would also work here) } - if (isAvailable(shape.outlet)) currentStage.onPull(ctx) + if (isAvailable(shape.out)) currentStage.onPull(ctx) null } override def pushAndPull(elem: Out): FreeDirective = { - push(shape.outlet, elem) - pull(shape.inlet) + push(shape.out, elem) + pull(shape.in) null } final override def holdUpstreamAndPush(elem: Out): UpstreamDirective = { - push(shape.outlet, elem) + push(shape.out, elem) null } final override def holdDownstreamAndPull(): DownstreamDirective = { - pull(shape.inlet) + pull(shape.in) null } - final override def isHoldingDownstream: Boolean = isAvailable(shape.outlet) + final override def isHoldingDownstream: Boolean = isAvailable(shape.out) - final override def isHoldingUpstream: Boolean = !(isClosed(shape.inlet) || hasBeenPulled(shape.inlet)) + final override def isHoldingUpstream: Boolean = !(isClosed(shape.in) || hasBeenPulled(shape.in)) final override def holdDownstream(): DownstreamDirective = null