!str #19137: Rename inlet and outlet to in and out on Flow/Source/SinkShape
This commit is contained in:
parent
3d20915cf4
commit
b478d70964
44 changed files with 272 additions and 247 deletions
|
|
@ -42,7 +42,7 @@ object MaterializationBenchmark {
|
|||
for (_ <- 1 to numOfNestedGraphs) {
|
||||
flow = GraphDSL.create(flow) { b ⇒
|
||||
flow ⇒
|
||||
FlowShape(flow.inlet, flow.outlet)
|
||||
FlowShape(flow.in, flow.out)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -58,13 +58,13 @@ object MaterializationBenchmark {
|
|||
RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b ⇒ source ⇒
|
||||
import GraphDSL.Implicits._
|
||||
val flow = Flow[Unit].map(identity)
|
||||
var outlet: Outlet[Unit] = source.outlet
|
||||
var out: Outlet[Unit] = source.out
|
||||
for (i <- 0 until numOfFlows) {
|
||||
val flowShape = b.add(flow)
|
||||
outlet ~> flowShape
|
||||
outlet = flowShape.outlet
|
||||
out ~> flowShape
|
||||
out = flowShape.outlet
|
||||
}
|
||||
outlet ~> Sink.ignore
|
||||
out ~> Sink.ignore
|
||||
ClosedShape
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,6 @@ Update procedure
|
|||
``BidiFlow.fromFlows`` or ``BidiFlow.fromFlowsMat``
|
||||
5. Replace all uses of ``BidiFlow.apply()`` (Scala DSL) or ``BidiFlow.create()`` (Java DSL) when it converts two
|
||||
functions to a ``BidiFlow`` with ``BidiFlow.fromFunctions``
|
||||
|
||||
Example
|
||||
^^^^^^^
|
||||
|
||||
|
|
@ -86,6 +85,20 @@ Should be replaced by
|
|||
|
||||
.. includecode:: code/docs/MigrationsJava.java#bidi-wrap
|
||||
|
||||
|
||||
Renamed ``inlet()`` and ``outlet()`` to ``in()`` and ``out()`` in ``SourceShape``, ``SinkShape`` and ``FlowShape``
|
||||
==========================================================================================================
|
||||
|
||||
The input and output ports of these shapes where called ``inlet()`` and ``outlet()`` compared to other shapes that
|
||||
consistently used ``in()`` and ``out()``. Now all :class:`Shape` s use ``in()`` and ``out()``.
|
||||
|
||||
Update procedure
|
||||
----------------
|
||||
|
||||
Change all references to ``inlet()`` to ``in()`` and all references to ``outlet()`` to ``out()`` when referring to the ports
|
||||
of :class:`FlowShape`, :class:`SourceShape` and :class:`SinkShape`.
|
||||
|
||||
|
||||
FlowGraph class and builder methods have been renamed
|
||||
=====================================================
|
||||
|
||||
|
|
|
|||
|
|
@ -181,7 +181,7 @@ class MigrationsScala extends AkkaSpec {
|
|||
// absorbTermination turns into the code below.
|
||||
// This emulates the behavior of the AsyncStage stage.
|
||||
private def absorbTermination(): Unit =
|
||||
if (isAvailable(shape.outlet)) getHandler(out).onPull()
|
||||
if (isAvailable(shape.out)) getHandler(out).onPull()
|
||||
|
||||
// The line below emulates the behavior of the AsyncStage holdingDownstream
|
||||
private def holdingDownstream(): Boolean =
|
||||
|
|
|
|||
|
|
@ -78,13 +78,13 @@ class CompositionDocSpec extends AkkaSpec {
|
|||
//#complex-graph
|
||||
import GraphDSL.Implicits._
|
||||
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
|
||||
val A: Outlet[Int] = builder.add(Source.single(0)).outlet
|
||||
val A: Outlet[Int] = builder.add(Source.single(0)).out
|
||||
val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
|
||||
val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
|
||||
val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
|
||||
val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
|
||||
val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
|
||||
val G: Inlet[Any] = builder.add(Sink.foreach(println)).inlet
|
||||
val G: Inlet[Any] = builder.add(Sink.foreach(println)).in
|
||||
|
||||
C <~ F
|
||||
A ~> B ~> C ~> F
|
||||
|
|
|
|||
|
|
@ -80,8 +80,8 @@ class FlowGraphDocSpec extends AkkaSpec {
|
|||
val broadcast = builder.add(Broadcast[Int](2))
|
||||
Source.single(1) ~> broadcast.in
|
||||
|
||||
broadcast.out(0) ~> sharedDoubler ~> topHS.inlet
|
||||
broadcast.out(1) ~> sharedDoubler ~> bottomHS.inlet
|
||||
broadcast.out(0) ~> sharedDoubler ~> topHS.in
|
||||
broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
|
||||
ClosedShape
|
||||
})
|
||||
//#flow-graph-reusing-a-flow
|
||||
|
|
@ -207,7 +207,7 @@ class FlowGraphDocSpec extends AkkaSpec {
|
|||
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
|
||||
implicit builder ⇒
|
||||
fold ⇒
|
||||
FlowShape(fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet)
|
||||
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
|
||||
})
|
||||
//#flow-graph-matvalue
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
|
|||
Source.single(1) ~> pm3.in(0)
|
||||
Source.single(2) ~> pm3.in(1)
|
||||
Source.single(3) ~> pm3.in(2)
|
||||
pm3.out ~> sink.inlet
|
||||
pm3.out ~> sink.in
|
||||
ClosedShape
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -108,7 +108,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
// then we continue using the echo-logic Flow
|
||||
echo.outlet ~> concat.in(1)
|
||||
|
||||
FlowShape(echo.inlet, concat.out)
|
||||
FlowShape(echo.in, concat.out)
|
||||
})
|
||||
|
||||
connection.handleWith(serverLogic)
|
||||
|
|
|
|||
|
|
@ -368,6 +368,18 @@ Update procedure
|
|||
|
||||
1. All custom shapes must use ``@uncheckedVariance`` on their ``Inlet`` and ``Outlet`` members.
|
||||
|
||||
Renamed ``inlet()`` and ``outlet()`` to ``in()`` and ``out()`` in ``SourceShape``, ``SinkShape`` and ``FlowShape``
|
||||
==========================================================================================================
|
||||
|
||||
The input and output ports of these shapes where called ``inlet()`` and ``outlet()`` compared to other shapes that
|
||||
consistently used ``in()`` and ``out()``. Now all :class:`Shape` s use ``in()`` and ``out()``.
|
||||
|
||||
Update procedure
|
||||
----------------
|
||||
|
||||
Change all references to ``inlet()`` to ``in()`` and all references to ``outlet()`` to ``out()`` when referring to the ports
|
||||
of :class:`FlowShape`, :class:`SourceShape` and :class:`SinkShape`.
|
||||
|
||||
Semantic change in ``isHoldingUpstream`` in the DetachedStage DSL
|
||||
=================================================================
|
||||
|
||||
|
|
@ -557,7 +569,7 @@ should be replaced by
|
|||
.. includecode:: code/docs/MigrationsScala.scala#file-source-sink
|
||||
|
||||
InputStreamSource and OutputStreamSink
|
||||
============================================
|
||||
======================================
|
||||
|
||||
Both have been replaced by ``Source.inputStream(…)`` and ``Sink.outputStream(…)`` due to discoverability issues.
|
||||
|
||||
|
|
|
|||
|
|
@ -100,8 +100,8 @@ private[http] object OutgoingConnectionBlueprint {
|
|||
|
||||
BidiShape(
|
||||
methodBypassFanout.in,
|
||||
wrapTls.outlet,
|
||||
unwrapTls.inlet,
|
||||
wrapTls.out,
|
||||
unwrapTls.in,
|
||||
terminationFanout.out(1))
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ private object PoolSlot {
|
|||
|
||||
slotProcessor ~> split.in
|
||||
|
||||
new FanOutShape2(slotProcessor.inlet,
|
||||
new FanOutShape2(slotProcessor.in,
|
||||
split.out(0).collect { case ResponseDelivery(r) ⇒ r }.outlet,
|
||||
split.out(1).collect { case r: RawSlotEvent ⇒ r }.outlet)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ private object RenderSupport {
|
|||
frst ⇒
|
||||
import GraphDSL.Implicits._
|
||||
second ~> Sink.cancelled
|
||||
SourceShape(frst.outlet)
|
||||
SourceShape(frst.out)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -146,8 +146,8 @@ private[http] object Websocket {
|
|||
|
||||
BidiShape(
|
||||
split.in,
|
||||
messagePreparation.outlet,
|
||||
messageRendering.inlet,
|
||||
messagePreparation.out,
|
||||
messageRendering.in,
|
||||
merge.out)
|
||||
}.named("ws-message-api"))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -122,9 +122,9 @@ object WebsocketClientBlueprint {
|
|||
wsIn.outlet ~> httpRequestBytesAndThenWSBytes
|
||||
|
||||
BidiShape(
|
||||
networkIn.inlet,
|
||||
networkIn.outlet,
|
||||
wsIn.inlet,
|
||||
networkIn.in,
|
||||
networkIn.out,
|
||||
wsIn.in,
|
||||
httpRequestBytesAndThenWSBytes.out)
|
||||
}) mapMaterializedValue (_ ⇒ result.future)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ public class BidiFlowTest extends StreamTest {
|
|||
}
|
||||
}));
|
||||
return new BidiShape<Integer, Long, ByteString, String>(top
|
||||
.inlet(), top.outlet(), bottom.inlet(), bottom.outlet());
|
||||
.in(), top.out(), bottom.in(), bottom.out());
|
||||
}
|
||||
}));
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ public class BidiFlowTest extends StreamTest {
|
|||
}
|
||||
}));
|
||||
return new BidiShape<Long, Integer, String, ByteString>(top
|
||||
.inlet(), top.outlet(), bottom.inlet(), bottom.outlet());
|
||||
.in(), top.out(), bottom.in(), bottom.out());
|
||||
}
|
||||
}));
|
||||
|
||||
|
|
@ -109,7 +109,7 @@ public class BidiFlowTest extends StreamTest {
|
|||
}
|
||||
}));
|
||||
return new BidiShape<Integer, Long, ByteString, String>(top
|
||||
.inlet(), top.outlet(), bottom.inlet(), bottom.outlet());
|
||||
.in(), top.out(), bottom.in(), bottom.out());
|
||||
}
|
||||
}));
|
||||
|
||||
|
|
@ -248,7 +248,7 @@ public class BidiFlowTest extends StreamTest {
|
|||
b.from(bcast).to(sink)
|
||||
.from(b.add(Source.single(1))).viaFanOut(bcast).toFanIn(merge)
|
||||
.from(flow).toFanIn(merge);
|
||||
return new FlowShape<String, Integer>(flow.inlet(), merge.out());
|
||||
return new FlowShape<String, Integer>(flow.in(), merge.out());
|
||||
}
|
||||
}));
|
||||
final Flow<Long, ByteString, Future<List<Long>>> right = Flow.fromGraph(GraphDSL.create(
|
||||
|
|
@ -258,7 +258,7 @@ public class BidiFlowTest extends StreamTest {
|
|||
SinkShape<List<Long>> sink) throws Exception {
|
||||
final FlowShape<Long, List<Long>> flow = b.add(Flow.of(Long.class).grouped(10));
|
||||
b.from(flow).to(sink);
|
||||
return new FlowShape<Long, ByteString>(flow.inlet(), b.add(Source.single(ByteString.fromString("10"))).outlet());
|
||||
return new FlowShape<Long, ByteString>(flow.in(), b.add(Source.single(ByteString.fromString("10"))).out());
|
||||
}
|
||||
}));
|
||||
final Pair<Pair<Future<Integer>, Future<Integer>>, Future<List<Long>>> result =
|
||||
|
|
|
|||
|
|
@ -364,8 +364,8 @@ public class FlowTest extends StreamTest {
|
|||
|
||||
RunnableGraph.fromGraph(GraphDSL.create(new Function<Builder<BoxedUnit>, ClosedShape>(){
|
||||
public ClosedShape apply(Builder<BoxedUnit> b) {
|
||||
final Outlet<String> in1 = b.add(Source.from(input1)).outlet();
|
||||
final Outlet<Integer> in2 = b.add(Source.from(input2)).outlet();
|
||||
final Outlet<String> in1 = b.add(Source.from(input1)).out();
|
||||
final Outlet<Integer> in2 = b.add(Source.from(input2)).out();
|
||||
final FanInShape2<String, Integer, Pair<String, Integer>> zip = b.add(Zip.<String, Integer>create());
|
||||
final SinkShape<Pair<String, Integer>> out =
|
||||
b.add(Sink.foreach(new Procedure<Pair<String, Integer>>() {
|
||||
|
|
|
|||
|
|
@ -358,7 +358,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic
|
|||
val bcast = b.add(Broadcast[String](2))
|
||||
|
||||
source1 ~> merge.in(0)
|
||||
source2.outlet ~> merge.in(1)
|
||||
source2.out ~> merge.in(1)
|
||||
|
||||
merge.out.map(_.toString) ~> bcast.in
|
||||
|
||||
|
|
|
|||
|
|
@ -72,8 +72,8 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
val sink = new DownstreamProbe[Int]("sink")
|
||||
|
||||
builder(detach)
|
||||
.connect(source, detach.shape.inlet)
|
||||
.connect(detach.shape.outlet, sink)
|
||||
.connect(source, detach.shape.in)
|
||||
.connect(detach.shape.out, sink)
|
||||
.init()
|
||||
|
||||
lastEvents() should ===(Set.empty)
|
||||
|
|
@ -313,8 +313,8 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
.connect(source, merge.in(0))
|
||||
.connect(merge.out, balance.in)
|
||||
.connect(balance.out(0), sink)
|
||||
.connect(balance.out(1), detach.shape.inlet)
|
||||
.connect(detach.shape.outlet, merge.in(1))
|
||||
.connect(balance.out(1), detach.shape.in)
|
||||
.connect(detach.shape.out, merge.in(1))
|
||||
.init()
|
||||
|
||||
lastEvents() should ===(Set.empty)
|
||||
|
|
@ -351,8 +351,8 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
|||
Attributes.none)
|
||||
|
||||
builder(buffer)
|
||||
.connect(source, buffer.shape.inlet)
|
||||
.connect(buffer.shape.outlet, sink)
|
||||
.connect(source, buffer.shape.in)
|
||||
.connect(buffer.shape.out, sink)
|
||||
.init()
|
||||
|
||||
stepAll()
|
||||
|
|
|
|||
|
|
@ -339,9 +339,9 @@ trait GraphInterpreterSpecKit {
|
|||
|
||||
while (i < ops.length) {
|
||||
val stage = ops(i).asInstanceOf[PushPullGraphStage[_, _, _]]
|
||||
ins(i) = stage.shape.inlet
|
||||
ins(i) = stage.shape.in
|
||||
inOwners(i) = i
|
||||
outs(i + 1) = stage.shape.outlet
|
||||
outs(i + 1) = stage.shape.out
|
||||
outOwners(i + 1) = i
|
||||
i += 1
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,8 +66,8 @@ class KeepGoingStageSpec extends AkkaSpec {
|
|||
} finally listener.foreach(_ ! EndOfEventHandler)
|
||||
}
|
||||
|
||||
setHandler(shape.inlet, new InHandler {
|
||||
override def onPush(): Unit = pull(shape.inlet)
|
||||
setHandler(shape.in, new InHandler {
|
||||
override def onPush(): Unit = pull(shape.in)
|
||||
|
||||
// Ignore finish
|
||||
override def onUpstreamFinish(): Unit = listener.foreach(_ ! UpstreamCompleted)
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
|||
|
||||
val top = b.add(Flow[Int].map(x ⇒ x.toLong + 2))
|
||||
val bottom = b.add(Flow[ByteString].map(_.decodeString("UTF-8")))
|
||||
BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet)
|
||||
BidiShape(top.in, top.out, bottom.in, bottom.out)
|
||||
})
|
||||
|
||||
val str = "Hello World"
|
||||
|
|
@ -97,13 +97,13 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
|||
bcast ~> sink
|
||||
Source.single(1) ~> bcast ~> merge
|
||||
flow ~> merge
|
||||
FlowShape(flow.inlet, merge.out)
|
||||
FlowShape(flow.in, merge.out)
|
||||
})
|
||||
val right = Flow.fromGraph(GraphDSL.create(Sink.head[immutable.Seq[Long]]) { implicit b ⇒
|
||||
sink ⇒
|
||||
val flow = b.add(Flow[Long].grouped(10))
|
||||
flow ~> sink
|
||||
FlowShape(flow.inlet, b.add(Source.single(ByteString("10"))).outlet)
|
||||
FlowShape(flow.in, b.add(Source.single(ByteString("10"))).out)
|
||||
})
|
||||
val ((l, m), r) = left.joinMat(bidiMat)(Keep.both).joinMat(right)(Keep.both).run()
|
||||
Await.result(l, 1.second) should ===(1)
|
||||
|
|
|
|||
|
|
@ -69,9 +69,9 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
|
|||
val assembly = new GraphAssembly(
|
||||
Array(stage),
|
||||
Array(Attributes.none),
|
||||
Array(stage.shape.inlet, null),
|
||||
Array(stage.shape.in, null),
|
||||
Array(0, -1),
|
||||
Array(null, stage.shape.outlet),
|
||||
Array(null, stage.shape.out),
|
||||
Array(-1, 0))
|
||||
|
||||
val (inHandlers, outHandlers, logics) =
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val flow = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒
|
||||
import GraphDSL.Implicits._
|
||||
FlowShape(partial.inlet, partial.outlet.map(_.toInt).outlet)
|
||||
FlowShape(partial.in, partial.out.map(_.toInt).outlet)
|
||||
})
|
||||
|
||||
source1.via(flow).to(Sink(probe)).run()
|
||||
|
|
@ -77,7 +77,7 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val probe = TestSubscriber.manualProbe[Int]()
|
||||
|
||||
val flow = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒ FlowShape(partial.inlet, partial.outlet)
|
||||
partial ⇒ FlowShape(partial.in, partial.out)
|
||||
})
|
||||
|
||||
source1.via(flow).map(_.toInt).to(Sink(probe)).run()
|
||||
|
|
@ -90,12 +90,12 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
|
||||
val flow1 = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒
|
||||
FlowShape(partial.inlet, partial.outlet)
|
||||
FlowShape(partial.in, partial.out)
|
||||
})
|
||||
|
||||
val flow2 = Flow.fromGraph(GraphDSL.create(Flow[String].map(_.toInt)) { implicit b ⇒
|
||||
importFlow ⇒
|
||||
FlowShape(importFlow.inlet, importFlow.outlet)
|
||||
FlowShape(importFlow.in, importFlow.out)
|
||||
})
|
||||
|
||||
source1.via(flow1).via(flow2).to(Sink(probe)).run()
|
||||
|
|
@ -107,7 +107,7 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val probe = TestSubscriber.manualProbe[Int]()
|
||||
|
||||
val flow = Flow.fromGraph(GraphDSL.create(Flow[Int].map(_ * 2)) { implicit b ⇒
|
||||
importFlow ⇒ FlowShape(importFlow.inlet, importFlow.outlet)
|
||||
importFlow ⇒ FlowShape(importFlow.in, importFlow.out)
|
||||
})
|
||||
|
||||
RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒
|
||||
|
|
@ -127,8 +127,8 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val source = Source.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒
|
||||
import GraphDSL.Implicits._
|
||||
source1 ~> partial.inlet
|
||||
SourceShape(partial.outlet.map(_.toInt).outlet)
|
||||
source1 ~> partial.in
|
||||
SourceShape(partial.out.map(_.toInt).outlet)
|
||||
})
|
||||
|
||||
source.to(Sink(probe)).run()
|
||||
|
|
@ -152,8 +152,8 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val source = Source.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒
|
||||
import GraphDSL.Implicits._
|
||||
source1 ~> partial.inlet
|
||||
SourceShape(partial.outlet)
|
||||
source1 ~> partial.in
|
||||
SourceShape(partial.out)
|
||||
})
|
||||
|
||||
source.map(_.toInt).to(Sink(probe)).run()
|
||||
|
|
@ -167,13 +167,13 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val source = Source.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒
|
||||
import GraphDSL.Implicits._
|
||||
source1 ~> partial.inlet
|
||||
SourceShape(partial.outlet)
|
||||
source1 ~> partial.in
|
||||
SourceShape(partial.out)
|
||||
})
|
||||
|
||||
val flow = Flow.fromGraph(GraphDSL.create(Flow[String].map(_.toInt)) { implicit b ⇒
|
||||
importFlow ⇒
|
||||
FlowShape(importFlow.inlet, importFlow.outlet)
|
||||
FlowShape(importFlow.in, importFlow.out)
|
||||
})
|
||||
|
||||
source.via(flow).to(Sink(probe)).run()
|
||||
|
|
@ -187,16 +187,16 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val source = Source.fromGraph(GraphDSL.create(Source(1 to 5)) { implicit b ⇒
|
||||
s ⇒
|
||||
import GraphDSL.Implicits._
|
||||
SourceShape(s.outlet.map(_ * 2).outlet)
|
||||
SourceShape(s.out.map(_ * 2).outlet)
|
||||
})
|
||||
|
||||
RunnableGraph.fromGraph(GraphDSL.create(source, source)(Keep.both) { implicit b ⇒
|
||||
(s1, s2) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
val merge = b.add(Merge[Int](2))
|
||||
s1.outlet ~> merge.in(0)
|
||||
s1.out ~> merge.in(0)
|
||||
merge.out ~> Sink(probe)
|
||||
s2.outlet.map(_ * 10) ~> merge.in(1)
|
||||
s2.out.map(_ * 10) ~> merge.in(1)
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
@ -211,8 +211,8 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val sink = Sink.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒
|
||||
import GraphDSL.Implicits._
|
||||
partial.outlet.map(_.toInt) ~> Sink(probe)
|
||||
SinkShape(partial.inlet)
|
||||
partial.out.map(_.toInt) ~> Sink(probe)
|
||||
SinkShape(partial.in)
|
||||
})
|
||||
|
||||
source1.to(sink).run()
|
||||
|
|
@ -225,7 +225,7 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val pubSink = Sink.publisher[Int](false)
|
||||
|
||||
val sink = Sink.fromGraph(GraphDSL.create(pubSink) { implicit b ⇒
|
||||
p ⇒ SinkShape(p.inlet)
|
||||
p ⇒ SinkShape(p.in)
|
||||
})
|
||||
|
||||
val mm = source1.runWith(sink)
|
||||
|
|
@ -240,9 +240,9 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val sink = Sink.fromGraph(GraphDSL.create(partialGraph, Flow[String].map(_.toInt))(Keep.both) { implicit b ⇒
|
||||
(partial, flow) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
flow.outlet ~> partial.inlet
|
||||
partial.outlet.map(_.toInt) ~> Sink(probe)
|
||||
SinkShape(flow.inlet)
|
||||
flow.out ~> partial.in
|
||||
partial.out.map(_.toInt) ~> Sink(probe)
|
||||
SinkShape(flow.in)
|
||||
})
|
||||
|
||||
val iSink = Flow[Int].map(_.toString).to(sink)
|
||||
|
|
@ -257,14 +257,14 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
|
||||
val flow = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒
|
||||
FlowShape(partial.inlet, partial.outlet)
|
||||
FlowShape(partial.in, partial.out)
|
||||
})
|
||||
|
||||
val sink = Sink.fromGraph(GraphDSL.create(Flow[String].map(_.toInt)) { implicit b ⇒
|
||||
flow ⇒
|
||||
import GraphDSL.Implicits._
|
||||
flow.outlet ~> Sink(probe)
|
||||
SinkShape(flow.inlet)
|
||||
flow.out ~> Sink(probe)
|
||||
SinkShape(flow.in)
|
||||
})
|
||||
|
||||
source1.via(flow).to(sink).run()
|
||||
|
|
@ -282,28 +282,28 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
val flow = Flow.fromGraph(GraphDSL.create(partialGraph) { implicit b ⇒
|
||||
partial ⇒
|
||||
import GraphDSL.Implicits._
|
||||
FlowShape(partial.inlet, partial.outlet.map(_.toInt).outlet)
|
||||
FlowShape(partial.in, partial.out.map(_.toInt).outlet)
|
||||
})
|
||||
|
||||
val source = Source.fromGraph(GraphDSL.create(Flow[Int].map(_.toString), inSource)(Keep.right) { implicit b ⇒
|
||||
(flow, src) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
src.outlet ~> flow.inlet
|
||||
SourceShape(flow.outlet)
|
||||
src.out ~> flow.in
|
||||
SourceShape(flow.out)
|
||||
})
|
||||
|
||||
val sink = Sink.fromGraph(GraphDSL.create(Flow[String].map(_.toInt), outSink)(Keep.right) { implicit b ⇒
|
||||
(flow, snk) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
flow.outlet ~> snk.inlet
|
||||
SinkShape(flow.inlet)
|
||||
flow.out ~> snk.in
|
||||
SinkShape(flow.in)
|
||||
})
|
||||
|
||||
val (m1, m2, m3) = RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)(Tuple3.apply) { implicit b ⇒
|
||||
(src, f, snk) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
src.outlet.map(_.toInt) ~> f.inlet
|
||||
f.outlet.map(_.toString) ~> snk.inlet
|
||||
src.out.map(_.toInt) ~> f.in
|
||||
f.out.map(_.toString) ~> snk.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
@ -322,18 +322,18 @@ class GraphFlowSpec extends AkkaSpec {
|
|||
|
||||
val source = Source.fromGraph(GraphDSL.create(inSource) { implicit b ⇒
|
||||
src ⇒
|
||||
SourceShape(src.outlet)
|
||||
SourceShape(src.out)
|
||||
})
|
||||
|
||||
val sink = Sink.fromGraph(GraphDSL.create(outSink) { implicit b ⇒
|
||||
snk ⇒
|
||||
SinkShape(snk.inlet)
|
||||
SinkShape(snk.in)
|
||||
})
|
||||
|
||||
val (m1, m2) = RunnableGraph.fromGraph(GraphDSL.create(source, sink)(Keep.both) { implicit b ⇒
|
||||
(src, snk) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
src.outlet ~> snk.inlet
|
||||
src.out ~> snk.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
|
|||
|
|
@ -63,11 +63,11 @@ class GraphBroadcastSpec extends AkkaSpec {
|
|||
(p1, p2, p3, p4, p5) ⇒
|
||||
val bcast = b.add(Broadcast[Int](5))
|
||||
Source(List(1, 2, 3)) ~> bcast.in
|
||||
bcast.out(0).grouped(5) ~> p1.inlet
|
||||
bcast.out(1).grouped(5) ~> p2.inlet
|
||||
bcast.out(2).grouped(5) ~> p3.inlet
|
||||
bcast.out(3).grouped(5) ~> p4.inlet
|
||||
bcast.out(4).grouped(5) ~> p5.inlet
|
||||
bcast.out(0).grouped(5) ~> p1.in
|
||||
bcast.out(1).grouped(5) ~> p2.in
|
||||
bcast.out(2).grouped(5) ~> p3.in
|
||||
bcast.out(3).grouped(5) ~> p4.in
|
||||
bcast.out(4).grouped(5) ~> p5.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
@ -94,28 +94,28 @@ class GraphBroadcastSpec extends AkkaSpec {
|
|||
(p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16, p17, p18, p19, p20, p21, p22) ⇒
|
||||
val bcast = b.add(Broadcast[Int](22))
|
||||
Source(List(1, 2, 3)) ~> bcast.in
|
||||
bcast.out(0).grouped(5) ~> p1.inlet
|
||||
bcast.out(1).grouped(5) ~> p2.inlet
|
||||
bcast.out(2).grouped(5) ~> p3.inlet
|
||||
bcast.out(3).grouped(5) ~> p4.inlet
|
||||
bcast.out(4).grouped(5) ~> p5.inlet
|
||||
bcast.out(5).grouped(5) ~> p6.inlet
|
||||
bcast.out(6).grouped(5) ~> p7.inlet
|
||||
bcast.out(7).grouped(5) ~> p8.inlet
|
||||
bcast.out(8).grouped(5) ~> p9.inlet
|
||||
bcast.out(9).grouped(5) ~> p10.inlet
|
||||
bcast.out(10).grouped(5) ~> p11.inlet
|
||||
bcast.out(11).grouped(5) ~> p12.inlet
|
||||
bcast.out(12).grouped(5) ~> p13.inlet
|
||||
bcast.out(13).grouped(5) ~> p14.inlet
|
||||
bcast.out(14).grouped(5) ~> p15.inlet
|
||||
bcast.out(15).grouped(5) ~> p16.inlet
|
||||
bcast.out(16).grouped(5) ~> p17.inlet
|
||||
bcast.out(17).grouped(5) ~> p18.inlet
|
||||
bcast.out(18).grouped(5) ~> p19.inlet
|
||||
bcast.out(19).grouped(5) ~> p20.inlet
|
||||
bcast.out(20).grouped(5) ~> p21.inlet
|
||||
bcast.out(21).grouped(5) ~> p22.inlet
|
||||
bcast.out(0).grouped(5) ~> p1.in
|
||||
bcast.out(1).grouped(5) ~> p2.in
|
||||
bcast.out(2).grouped(5) ~> p3.in
|
||||
bcast.out(3).grouped(5) ~> p4.in
|
||||
bcast.out(4).grouped(5) ~> p5.in
|
||||
bcast.out(5).grouped(5) ~> p6.in
|
||||
bcast.out(6).grouped(5) ~> p7.in
|
||||
bcast.out(7).grouped(5) ~> p8.in
|
||||
bcast.out(8).grouped(5) ~> p9.in
|
||||
bcast.out(9).grouped(5) ~> p10.in
|
||||
bcast.out(10).grouped(5) ~> p11.in
|
||||
bcast.out(11).grouped(5) ~> p12.in
|
||||
bcast.out(12).grouped(5) ~> p13.in
|
||||
bcast.out(13).grouped(5) ~> p14.in
|
||||
bcast.out(14).grouped(5) ~> p15.in
|
||||
bcast.out(15).grouped(5) ~> p16.in
|
||||
bcast.out(16).grouped(5) ~> p17.in
|
||||
bcast.out(17).grouped(5) ~> p18.in
|
||||
bcast.out(18).grouped(5) ~> p19.in
|
||||
bcast.out(19).grouped(5) ~> p20.in
|
||||
bcast.out(20).grouped(5) ~> p21.in
|
||||
bcast.out(21).grouped(5) ~> p22.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
|
|||
|
|
@ -83,16 +83,16 @@ class GraphMatValueSpec extends AkkaSpec {
|
|||
(s1, s2) ⇒
|
||||
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
|
||||
|
||||
s1.outlet.mapAsync(4)(identity) ~> zip.in0
|
||||
s2.outlet.mapAsync(4)(identity).map(_ * 100) ~> zip.in1
|
||||
s1.out.mapAsync(4)(identity) ~> zip.in0
|
||||
s2.out.mapAsync(4)(identity).map(_ * 100) ~> zip.in1
|
||||
SourceShape(zip.out)
|
||||
})
|
||||
|
||||
val compositeSource2 = Source.fromGraph(GraphDSL.create(compositeSource1, compositeSource1)(Keep.both) { implicit b ⇒
|
||||
(s1, s2) ⇒
|
||||
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
|
||||
s1.outlet ~> zip.in0
|
||||
s2.outlet.map(_ * 10000) ~> zip.in1
|
||||
s1.out ~> zip.in0
|
||||
s2.out.map(_ * 10000) ~> zip.in1
|
||||
SourceShape(zip.out)
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ class GraphMergePreferredSpec extends TwoStreamsSetup {
|
|||
val merge = b.add(MergePreferred[Int](3))
|
||||
preferred ~> merge.preferred
|
||||
|
||||
merge.out.grouped(numElements * 2) ~> sink.inlet
|
||||
merge.out.grouped(numElements * 2) ~> sink.in
|
||||
aux ~> merge.in(0)
|
||||
aux ~> merge.in(1)
|
||||
aux ~> merge.in(2)
|
||||
|
|
@ -53,7 +53,7 @@ class GraphMergePreferredSpec extends TwoStreamsSetup {
|
|||
val merge = b.add(MergePreferred[Int](3))
|
||||
Source(1 to 100) ~> merge.preferred
|
||||
|
||||
merge.out.grouped(500) ~> sink.inlet
|
||||
merge.out.grouped(500) ~> sink.in
|
||||
Source(101 to 200) ~> merge.in(0)
|
||||
Source(201 to 300) ~> merge.in(1)
|
||||
Source(301 to 400) ~> merge.in(2)
|
||||
|
|
|
|||
|
|
@ -157,8 +157,8 @@ class GraphMergeSpec extends TwoStreamsSetup {
|
|||
val (graphSubscriber1, graphSubscriber2) = RunnableGraph.fromGraph(GraphDSL.create(src1, src2)((_, _)) { implicit b ⇒
|
||||
(s1, s2) ⇒
|
||||
val merge = b.add(Merge[Int](2))
|
||||
s1.outlet ~> merge.in(0)
|
||||
s2.outlet ~> merge.in(1)
|
||||
s1.out ~> merge.in(0)
|
||||
s2.out ~> merge.in(1)
|
||||
merge.out ~> Sink(down)
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual
|
|||
Source(List(1, 2, 3)) ~> bcast.in
|
||||
bcast.out(0) ~> merge.in(0)
|
||||
bcast.out(1).map(_ + 3) ~> merge.in(1)
|
||||
merge.out.grouped(10) ~> sink.inlet
|
||||
merge.out.grouped(10) ~> sink.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual
|
|||
|
||||
for (i ← 0 until 5) balance.out(i) ~> merge.in(i)
|
||||
|
||||
merge.out.grouped(elements.size * 2) ~> sink.inlet
|
||||
merge.out.grouped(elements.size * 2) ~> sink.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
@ -118,15 +118,15 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual
|
|||
|
||||
// Second layer
|
||||
m11.out ~> b11.in
|
||||
b11.out(0).grouped(1000) ~> sink2.inlet // Vertex 2 is omitted since it has only one in and out
|
||||
b11.out(0).grouped(1000) ~> sink2.in // Vertex 2 is omitted since it has only one in and out
|
||||
b11.out(1) ~> m9.in(0)
|
||||
b11.out(2) ~> m10.in(1)
|
||||
|
||||
m8.out ~> m9.in(1)
|
||||
|
||||
// Third layer
|
||||
m9.out.grouped(1000) ~> sink9.inlet
|
||||
m10.out.grouped(1000) ~> sink10.inlet
|
||||
m9.out.grouped(1000) ~> sink9.in
|
||||
m10.out.grouped(1000) ~> sink10.in
|
||||
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
|
@ -147,7 +147,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec with ConversionCheckedTripleEqual
|
|||
Source(List(1, 2, 3)).map(_ * 2) ~> bcast.in
|
||||
bcast.out(0) ~> merge.in(0)
|
||||
bcast.out(1).map(_ + 3) ~> merge.in(1)
|
||||
merge.out.grouped(10) ~> sink.inlet
|
||||
merge.out.grouped(10) ~> sink.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
|
|||
|
|
@ -29,9 +29,9 @@ class GraphPartialSpec extends AkkaSpec {
|
|||
|
||||
val (_, _, result) = RunnableGraph.fromGraph(GraphDSL.create(doubler, doubler, Sink.head[Seq[Int]])(Tuple3.apply) { implicit b ⇒
|
||||
(d1, d2, sink) ⇒
|
||||
Source(List(1, 2, 3)) ~> d1.inlet
|
||||
d1.outlet ~> d2.inlet
|
||||
d2.outlet.grouped(100) ~> sink.inlet
|
||||
Source(List(1, 2, 3)) ~> d1.in
|
||||
d1.out ~> d2.in
|
||||
d2.out.grouped(100) ~> sink.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
@ -46,15 +46,15 @@ class GraphPartialSpec extends AkkaSpec {
|
|||
|
||||
bcast.out(0) ~> zip.in0
|
||||
bcast.out(1) ~> zip.in1
|
||||
bcast.out(2).grouped(100) ~> sink.inlet
|
||||
bcast.out(2).grouped(100) ~> sink.in
|
||||
FlowShape(bcast.in, zip.out)
|
||||
}
|
||||
|
||||
val (sub1, sub2, result) = RunnableGraph.fromGraph(GraphDSL.create(doubler, doubler, Sink.head[Seq[Int]])(Tuple3.apply) { implicit b ⇒
|
||||
(d1, d2, sink) ⇒
|
||||
Source(List(1, 2, 3)) ~> d1.inlet
|
||||
d1.outlet ~> d2.inlet
|
||||
d2.outlet.grouped(100) ~> sink.inlet
|
||||
Source(List(1, 2, 3)) ~> d1.in
|
||||
d1.out ~> d2.in
|
||||
d2.out.grouped(100) ~> sink.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
@ -74,19 +74,19 @@ class GraphPartialSpec extends AkkaSpec {
|
|||
|
||||
bcast.out(0) ~> zip.in0
|
||||
bcast.out(1) ~> zip.in1
|
||||
bcast.out(2) ~> s1.inlet
|
||||
bcast.out(2) ~> s1.in
|
||||
|
||||
zip.out ~> bcast2.in
|
||||
bcast2.out(0) ~> s2.inlet
|
||||
bcast2.out(0) ~> s2.in
|
||||
|
||||
FlowShape(bcast.in, bcast2.out(1))
|
||||
}
|
||||
|
||||
val (sub1, sub2, result) = RunnableGraph.fromGraph(GraphDSL.create(doubler, doubler, Sink.head[Seq[Int]])(Tuple3.apply) { implicit b ⇒
|
||||
(d1, d2, sink) ⇒
|
||||
Source(List(1, 2, 3)) ~> d1.inlet
|
||||
d1.outlet ~> d2.inlet
|
||||
d2.outlet.grouped(100) ~> sink.inlet
|
||||
Source(List(1, 2, 3)) ~> d1.in
|
||||
d1.out ~> d2.in
|
||||
d2.out.grouped(100) ~> sink.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
@ -100,14 +100,14 @@ class GraphPartialSpec extends AkkaSpec {
|
|||
"be able to expose the ports of imported graphs" in {
|
||||
val p = GraphDSL.create(Flow[Int].map(_ + 1)) { implicit b ⇒
|
||||
flow ⇒
|
||||
FlowShape(flow.inlet, flow.outlet)
|
||||
FlowShape(flow.in, flow.out)
|
||||
}
|
||||
|
||||
val fut = RunnableGraph.fromGraph(GraphDSL.create(Sink.head[Int], p)(Keep.left) { implicit b ⇒
|
||||
(sink, flow) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
Source.single(0) ~> flow.inlet
|
||||
flow.outlet ~> sink.inlet
|
||||
Source.single(0) ~> flow.in
|
||||
flow.out ~> sink.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
|
|||
|
|
@ -26,8 +26,8 @@ class PublisherSinkSpec extends AkkaSpec {
|
|||
val bcast = b.add(Broadcast[Int](2))
|
||||
|
||||
Source(0 to 5) ~> bcast.in
|
||||
bcast.out(0).map(_ * 2) ~> p1.inlet
|
||||
bcast.out(1) ~> p2.inlet
|
||||
bcast.out(0).map(_ * 2) ~> p1.in
|
||||
bcast.out(1) ~> p2.in
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
|||
"work from Inlets" in {
|
||||
Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒
|
||||
s ⇒
|
||||
s.inlet <~ source
|
||||
s.in <~ source
|
||||
ClosedShape
|
||||
}).run(), 1.second) should ===(Seq(1, 2, 3))
|
||||
}
|
||||
|
|
@ -44,7 +44,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
|||
|
||||
"not work from Outlets" in {
|
||||
RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒
|
||||
val o: Outlet[Int] = b.add(source).outlet
|
||||
val o: Outlet[Int] = b.add(source).out
|
||||
"o <~ source" shouldNot compile
|
||||
sink <~ o
|
||||
ClosedShape
|
||||
|
|
@ -99,7 +99,7 @@ class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
|
|||
"work towards Outlets" in {
|
||||
Await.result(RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b ⇒
|
||||
s ⇒
|
||||
val o: Outlet[Int] = b.add(source).outlet
|
||||
val o: Outlet[Int] = b.add(source).out
|
||||
s <~ o
|
||||
ClosedShape
|
||||
}).run(), 1.second) should ===(Seq(1, 2, 3))
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ class SinkSpec extends AkkaSpec {
|
|||
val sink = Sink.fromGraph(GraphDSL.create(Sink(probes(0))) { implicit b ⇒
|
||||
s0 ⇒
|
||||
val bcast = b.add(Broadcast[Int](3))
|
||||
bcast.out(0) ~> Flow[Int].filter(_ == 0) ~> s0.inlet
|
||||
bcast.out(0) ~> Flow[Int].filter(_ == 0) ~> s0.in
|
||||
for (i ← 1 to 2) bcast.out(i).filter(_ == i) ~> Sink(probes(i))
|
||||
SinkShape(bcast.in)
|
||||
})
|
||||
|
|
@ -52,8 +52,8 @@ class SinkSpec extends AkkaSpec {
|
|||
val sink = Sink.fromGraph(GraphDSL.create(Sink(probes(0)), Sink(probes(1)))(List(_, _)) { implicit b ⇒
|
||||
(s0, s1) ⇒
|
||||
val bcast = b.add(Broadcast[Int](3))
|
||||
bcast.out(0).filter(_ == 0) ~> s0.inlet
|
||||
bcast.out(1).filter(_ == 1) ~> s1.inlet
|
||||
bcast.out(0).filter(_ == 0) ~> s0.in
|
||||
bcast.out(1).filter(_ == 1) ~> s1.in
|
||||
bcast.out(2).filter(_ == 2) ~> Sink(probes(2))
|
||||
SinkShape(bcast.in)
|
||||
})
|
||||
|
|
@ -70,9 +70,9 @@ class SinkSpec extends AkkaSpec {
|
|||
val sink = Sink.fromGraph(GraphDSL.create(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(List(_, _, _)) { implicit b ⇒
|
||||
(s0, s1, s2) ⇒
|
||||
val bcast = b.add(Broadcast[Int](3))
|
||||
bcast.out(0).filter(_ == 0) ~> s0.inlet
|
||||
bcast.out(1).filter(_ == 1) ~> s1.inlet
|
||||
bcast.out(2).filter(_ == 2) ~> s2.inlet
|
||||
bcast.out(0).filter(_ == 0) ~> s0.in
|
||||
bcast.out(1).filter(_ == 1) ~> s1.in
|
||||
bcast.out(2).filter(_ == 2) ~> s2.in
|
||||
SinkShape(bcast.in)
|
||||
})
|
||||
Source(List(0, 1, 2)).runWith(sink)
|
||||
|
|
|
|||
|
|
@ -141,11 +141,11 @@ class SourceSpec extends AkkaSpec {
|
|||
(i0, i1, i2, i3, i4) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
val m = b.add(Merge[Int](5))
|
||||
i0.outlet ~> m.in(0)
|
||||
i1.outlet ~> m.in(1)
|
||||
i2.outlet ~> m.in(2)
|
||||
i3.outlet ~> m.in(3)
|
||||
i4.outlet ~> m.in(4)
|
||||
i0.out ~> m.in(0)
|
||||
i1.out ~> m.in(1)
|
||||
i2.out ~> m.in(2)
|
||||
i3.out ~> m.in(3)
|
||||
i4.out ~> m.in(4)
|
||||
SourceShape(m.out)
|
||||
}).to(Sink(out)).run()
|
||||
|
||||
|
|
|
|||
|
|
@ -232,11 +232,11 @@ case class AmorphousShape(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Se
|
|||
* A Source [[Shape]] has exactly one output and no inputs, it models a source
|
||||
* of data.
|
||||
*/
|
||||
final case class SourceShape[+T](outlet: Outlet[T @uncheckedVariance]) extends Shape {
|
||||
final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape {
|
||||
override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq
|
||||
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
|
||||
override val outlets: immutable.Seq[Outlet[_]] = List(out)
|
||||
|
||||
override def deepCopy(): SourceShape[T] = SourceShape(outlet.carbonCopy())
|
||||
override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy())
|
||||
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
|
||||
require(inlets.isEmpty, s"proposed inlets [${inlets.mkString(", ")}] do not fit SourceShape")
|
||||
require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit SourceShape")
|
||||
|
|
@ -254,11 +254,11 @@ object SourceShape {
|
|||
* outside like a pipe (but it can be a complex topology of streams within of
|
||||
* course).
|
||||
*/
|
||||
final case class FlowShape[-I, +O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]) extends Shape {
|
||||
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
|
||||
override val outlets: immutable.Seq[Outlet[_]] = List(outlet)
|
||||
final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape {
|
||||
override val inlets: immutable.Seq[Inlet[_]] = List(in)
|
||||
override val outlets: immutable.Seq[Outlet[_]] = List(out)
|
||||
|
||||
override def deepCopy(): FlowShape[I, O] = FlowShape(inlet.carbonCopy(), outlet.carbonCopy())
|
||||
override def deepCopy(): FlowShape[I, O] = FlowShape(in.carbonCopy(), out.carbonCopy())
|
||||
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
|
||||
require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit FlowShape")
|
||||
require(outlets.size == 1, s"proposed outlets [${outlets.mkString(", ")}] do not fit FlowShape")
|
||||
|
|
@ -274,11 +274,11 @@ object FlowShape {
|
|||
/**
|
||||
* A Sink [[Shape]] has exactly one input and no outputs, it models a data sink.
|
||||
*/
|
||||
final case class SinkShape[-T](inlet: Inlet[T @uncheckedVariance]) extends Shape {
|
||||
override val inlets: immutable.Seq[Inlet[_]] = List(inlet)
|
||||
final case class SinkShape[-T](in: Inlet[T @uncheckedVariance]) extends Shape {
|
||||
override val inlets: immutable.Seq[Inlet[_]] = List(in)
|
||||
override val outlets: immutable.Seq[Outlet[_]] = EmptyImmutableSeq
|
||||
|
||||
override def deepCopy(): SinkShape[T] = SinkShape(inlet.carbonCopy())
|
||||
override def deepCopy(): SinkShape[T] = SinkShape(in.carbonCopy())
|
||||
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): Shape = {
|
||||
require(inlets.size == 1, s"proposed inlets [${inlets.mkString(", ")}] do not fit SinkShape")
|
||||
require(outlets.isEmpty, s"proposed outlets [${outlets.mkString(", ")}] do not fit SinkShape")
|
||||
|
|
@ -315,7 +315,7 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1 @uncheckedVa
|
|||
/**
|
||||
* Java API for creating from a pair of unidirectional flows.
|
||||
*/
|
||||
def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.inlet, top.outlet, bottom.inlet, bottom.outlet)
|
||||
def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.in, top.out, bottom.in, bottom.out)
|
||||
|
||||
override def deepCopy(): BidiShape[In1, Out1, In2, Out2] =
|
||||
BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy())
|
||||
|
|
@ -330,7 +330,7 @@ final case class BidiShape[-In1, +Out1, -In2, +Out2](in1: Inlet[In1 @uncheckedVa
|
|||
//#bidi-shape
|
||||
object BidiShape {
|
||||
def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] =
|
||||
BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet)
|
||||
BidiShape(top.in, top.out, bottom.in, bottom.out)
|
||||
|
||||
/** Java API */
|
||||
def of[In1, Out1, In2, Out2](in1: Inlet[In1 @uncheckedVariance],
|
||||
|
|
|
|||
|
|
@ -98,11 +98,11 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
|
|||
atomic match {
|
||||
case sink: SinkModule[_, _] ⇒
|
||||
val (sub, mat) = sink.create(newMaterializationContext())
|
||||
assignPort(sink.shape.inlet, sub.asInstanceOf[Subscriber[Any]])
|
||||
assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]])
|
||||
matVal.put(atomic, mat)
|
||||
case source: SourceModule[_, _] ⇒
|
||||
val (pub, mat) = source.create(newMaterializationContext())
|
||||
assignPort(source.shape.outlet, pub.asInstanceOf[Publisher[Any]])
|
||||
assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
|
||||
matVal.put(atomic, mat)
|
||||
|
||||
// FIXME: Remove this, only stream-of-stream ops need it
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out
|
|||
// This is okay since the only caller of this method is right below.
|
||||
protected def newInstance(shape: SourceShape[Out] @uncheckedVariance): SourceModule[Out, Mat]
|
||||
|
||||
override def carbonCopy: Module = newInstance(SourceShape(shape.outlet.carbonCopy()))
|
||||
override def carbonCopy: Module = newInstance(SourceShape(shape.out.carbonCopy()))
|
||||
|
||||
override def subModules: Set[Module] = Set.empty
|
||||
|
||||
|
|
@ -40,7 +40,7 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out
|
|||
val thatN = attr.nameOrDefault(null)
|
||||
|
||||
if ((thatN eq null) || thisN == thatN) shape
|
||||
else shape.copy(outlet = Outlet(thatN + ".out"))
|
||||
else shape.copy(out = Outlet(thatN + ".out"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
|
|||
// This is okay since we the only caller of this method is right below.
|
||||
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]
|
||||
|
||||
override def carbonCopy: Module = newInstance(SinkShape(shape.inlet.carbonCopy()))
|
||||
override def carbonCopy: Module = newInstance(SinkShape(shape.in.carbonCopy()))
|
||||
|
||||
override def subModules: Set[Module] = Set.empty
|
||||
|
||||
|
|
@ -38,7 +38,7 @@ private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) exte
|
|||
val thatN = attr.nameOrDefault(null)
|
||||
|
||||
if ((thatN eq null) || thisN == thatN) shape
|
||||
else shape.copy(inlet = Inlet(thatN + ".in"))
|
||||
else shape.copy(in = Inlet(thatN + ".in"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -129,9 +129,9 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S
|
|||
val op = opsIterator.next().asInstanceOf[Stage[Any, Any]]
|
||||
val stage = new PushPullGraphStage((_) ⇒ op, Attributes.none)
|
||||
stages(i) = stage
|
||||
ins(i) = stage.shape.inlet
|
||||
ins(i) = stage.shape.in
|
||||
inOwners(i) = i
|
||||
outs(i + 1) = stage.shape.outlet
|
||||
outs(i + 1) = stage.shape.out
|
||||
outOwners(i + 1) = i
|
||||
i += 1
|
||||
}
|
||||
|
|
|
|||
|
|
@ -173,8 +173,8 @@ private[stream] object TcpConnectionStage {
|
|||
class TcpStreamLogic(val shape: FlowShape[ByteString, ByteString], val role: TcpRole) extends GraphStageLogic(shape) {
|
||||
implicit private var self: StageActorRef = _
|
||||
|
||||
private def bytesIn = shape.inlet
|
||||
private def bytesOut = shape.outlet
|
||||
private def bytesIn = shape.in
|
||||
private def bytesOut = shape.out
|
||||
private var connection: ActorRef = _
|
||||
|
||||
// No reading until role have been decided
|
||||
|
|
|
|||
|
|
@ -307,14 +307,14 @@ object GraphDSL extends GraphCreate {
|
|||
def materializedValue: Outlet[Mat @uncheckedVariance] = delegate.materializedValue
|
||||
|
||||
def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out)
|
||||
def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.outlet)
|
||||
def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.outlet)
|
||||
def from[T](src: SourceShape[T]): ForwardOps[T] = new ForwardOps(src.out)
|
||||
def from[I, O](f: FlowShape[I, O]): ForwardOps[O] = new ForwardOps(f.out)
|
||||
def from[I, O](j: UniformFanInShape[I, O]): ForwardOps[O] = new ForwardOps(j.out)
|
||||
def from[I, O](j: UniformFanOutShape[I, O]): ForwardOps[O] = new ForwardOps(findOut(delegate, j, 0))
|
||||
|
||||
def to[T](in: Inlet[T]): ReverseOps[T] = new ReverseOps(in)
|
||||
def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.inlet)
|
||||
def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.inlet)
|
||||
def to[T](dst: SinkShape[T]): ReverseOps[T] = new ReverseOps(dst.in)
|
||||
def to[I, O](f: FlowShape[I, O]): ReverseOps[I] = new ReverseOps(f.in)
|
||||
def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0))
|
||||
def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in)
|
||||
|
||||
|
|
|
|||
|
|
@ -170,7 +170,7 @@ object BidiFlow {
|
|||
flow1: Graph[FlowShape[I1, O1], M1],
|
||||
flow2: Graph[FlowShape[I2, O2], M2])(combine: (M1, M2) ⇒ M): BidiFlow[I1, O1, I2, O2, M] =
|
||||
fromGraph(GraphDSL.create(flow1, flow2)(combine) {
|
||||
implicit b ⇒ (f1, f2) ⇒ BidiShape(f1.inlet, f1.outlet, f2.inlet, f2.outlet)
|
||||
implicit b ⇒ (f1, f2) ⇒ BidiShape(f1.in, f1.out, f2.in, f2.out)
|
||||
})
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -46,8 +46,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
|||
val flowCopy = flow.module.carbonCopy
|
||||
new Flow(
|
||||
module
|
||||
.fuse(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine)
|
||||
.replaceShape(FlowShape(shape.inlet, flowCopy.shape.outlets.head)))
|
||||
.fuse(flowCopy, shape.out, flowCopy.shape.inlets.head, combine)
|
||||
.replaceShape(FlowShape(shape.in, flowCopy.shape.outlets.head)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -93,8 +93,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
|||
val sinkCopy = sink.module.carbonCopy
|
||||
new Sink(
|
||||
module
|
||||
.fuse(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)
|
||||
.replaceShape(SinkShape(shape.inlet)))
|
||||
.fuse(sinkCopy, shape.out, sinkCopy.shape.inlets.head, combine)
|
||||
.replaceShape(SinkShape(shape.in)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -136,8 +136,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
|||
RunnableGraph(
|
||||
module
|
||||
.compose(flowCopy, combine)
|
||||
.wire(shape.outlet, flowCopy.shape.inlets.head)
|
||||
.wire(flowCopy.shape.outlets.head, shape.inlet))
|
||||
.wire(shape.out, flowCopy.shape.inlets.head)
|
||||
.wire(flowCopy.shape.outlets.head, shape.in))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -181,8 +181,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
|||
val outs = copy.shape.outlets
|
||||
new Flow(module
|
||||
.compose(copy, combine)
|
||||
.wire(shape.outlet, ins.head)
|
||||
.wire(outs(1), shape.inlet)
|
||||
.wire(shape.out, ins.head)
|
||||
.wire(outs(1), shape.in)
|
||||
.replaceShape(FlowShape(ins(1), outs.head)))
|
||||
}
|
||||
|
||||
|
|
@ -191,14 +191,14 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
|
|||
private[stream] override def deprecatedAndThen[U](op: StageModule): Repr[U] = {
|
||||
//No need to copy here, op is a fresh instance
|
||||
if (this.isIdentity) new Flow(op).asInstanceOf[Repr[U]]
|
||||
else new Flow(module.fuse(op, shape.outlet, op.inPort).replaceShape(FlowShape(shape.inlet, op.outPort)))
|
||||
else new Flow(module.fuse(op, shape.out, op.inPort).replaceShape(FlowShape(shape.in, op.outPort)))
|
||||
}
|
||||
|
||||
// FIXME: Only exists to keep old stuff alive
|
||||
private[akka] def deprecatedAndThenMat[U, Mat2, O >: Out](processorFactory: () ⇒ (Processor[O, U], Mat2)): ReprMat[U, Mat2] = {
|
||||
val op = DirectProcessor(processorFactory.asInstanceOf[() ⇒ (Processor[Any, Any], Any)])
|
||||
if (this.isIdentity) new Flow(op).asInstanceOf[ReprMat[U, Mat2]]
|
||||
else new Flow[In, U, Mat2](module.fuse(op, shape.outlet, op.inPort, Keep.right).replaceShape(FlowShape(shape.inlet, op.outPort)))
|
||||
else new Flow[In, U, Mat2](module.fuse(op, shape.out, op.inPort, Keep.right).replaceShape(FlowShape(shape.in, op.outPort)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -289,7 +289,7 @@ object Flow {
|
|||
* Helper to create `Flow` from a `Sink`and a `Source`.
|
||||
*/
|
||||
def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(f: (M1, M2) ⇒ M): Flow[I, O, M] =
|
||||
fromGraph(GraphDSL.create(sink, source)(f) { implicit b ⇒ (in, out) ⇒ FlowShape(in.inlet, out.outlet) })
|
||||
fromGraph(GraphDSL.create(sink, source)(f) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) })
|
||||
}
|
||||
|
||||
object RunnableGraph {
|
||||
|
|
|
|||
|
|
@ -812,8 +812,8 @@ object GraphDSL extends GraphApply {
|
|||
|
||||
def ~>[Out](via: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out] = {
|
||||
val s = b.add(via)
|
||||
b.addEdge(importAndGetPort(b), s.inlet)
|
||||
s.outlet
|
||||
b.addEdge(importAndGetPort(b), s.in)
|
||||
s.out
|
||||
}
|
||||
|
||||
def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {
|
||||
|
|
@ -836,15 +836,15 @@ object GraphDSL extends GraphApply {
|
|||
}
|
||||
|
||||
def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {
|
||||
b.addEdge(importAndGetPort(b), flow.inlet)
|
||||
flow.outlet
|
||||
b.addEdge(importAndGetPort(b), flow.in)
|
||||
flow.out
|
||||
}
|
||||
|
||||
def ~>(to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit =
|
||||
b.addEdge(importAndGetPort(b), b.add(to).inlet)
|
||||
b.addEdge(importAndGetPort(b), b.add(to).in)
|
||||
|
||||
def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit =
|
||||
b.addEdge(importAndGetPort(b), to.inlet)
|
||||
b.addEdge(importAndGetPort(b), to.in)
|
||||
}
|
||||
|
||||
sealed trait ReverseCombinerBase[T] extends Any {
|
||||
|
|
@ -855,8 +855,8 @@ object GraphDSL extends GraphApply {
|
|||
|
||||
def <~[In](via: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In] = {
|
||||
val s = b.add(via)
|
||||
b.addEdge(s.outlet, importAndGetPortReverse(b))
|
||||
s.inlet
|
||||
b.addEdge(s.out, importAndGetPortReverse(b))
|
||||
s.in
|
||||
}
|
||||
|
||||
def <~[In](junction: UniformFanOutShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = {
|
||||
|
|
@ -879,15 +879,15 @@ object GraphDSL extends GraphApply {
|
|||
}
|
||||
|
||||
def <~[In](flow: FlowShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = {
|
||||
b.addEdge(flow.outlet, importAndGetPortReverse(b))
|
||||
flow.inlet
|
||||
b.addEdge(flow.out, importAndGetPortReverse(b))
|
||||
flow.in
|
||||
}
|
||||
|
||||
def <~(from: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit =
|
||||
b.addEdge(b.add(from).outlet, importAndGetPortReverse(b))
|
||||
b.addEdge(b.add(from).out, importAndGetPortReverse(b))
|
||||
|
||||
def <~(from: SourceShape[T])(implicit b: Builder[_]): Unit =
|
||||
b.addEdge(from.outlet, importAndGetPortReverse(b))
|
||||
b.addEdge(from.out, importAndGetPortReverse(b))
|
||||
}
|
||||
|
||||
// Although Mat is always Unit, it cannot be removed as a type parameter, otherwise the "override type"
|
||||
|
|
@ -911,7 +911,7 @@ object GraphDSL extends GraphApply {
|
|||
|
||||
override private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U] = {
|
||||
b.deprecatedAndThen(outlet, op)
|
||||
new PortOpsImpl(op.shape.outlet.asInstanceOf[Outlet[U]], b)
|
||||
new PortOpsImpl(op.shape.out.asInstanceOf[Outlet[U]], b)
|
||||
}
|
||||
|
||||
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Closed =
|
||||
|
|
@ -944,33 +944,33 @@ object GraphDSL extends GraphApply {
|
|||
}
|
||||
|
||||
implicit final class SinkArrow[T](val s: Graph[SinkShape[T], _]) extends AnyVal with ReverseCombinerBase[T] {
|
||||
override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s).inlet
|
||||
override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s).in
|
||||
}
|
||||
|
||||
implicit final class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] {
|
||||
override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = s.inlet
|
||||
override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = s.in
|
||||
}
|
||||
|
||||
implicit final class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] {
|
||||
override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.inlet
|
||||
override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.in
|
||||
|
||||
def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
|
||||
val shape = b.add(bidi)
|
||||
b.addEdge(f.outlet, shape.in1)
|
||||
b.addEdge(shape.out2, f.inlet)
|
||||
b.addEdge(f.out, shape.in1)
|
||||
b.addEdge(shape.out2, f.in)
|
||||
shape
|
||||
}
|
||||
|
||||
def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
|
||||
b.addEdge(f.outlet, bidi.in1)
|
||||
b.addEdge(bidi.out2, f.inlet)
|
||||
b.addEdge(f.out, bidi.in1)
|
||||
b.addEdge(bidi.out2, f.in)
|
||||
bidi
|
||||
}
|
||||
|
||||
def <~>[M](flow: Graph[FlowShape[O, I], M])(implicit b: Builder[_]): Unit = {
|
||||
val shape = b.add(flow)
|
||||
b.addEdge(shape.outlet, f.inlet)
|
||||
b.addEdge(f.outlet, shape.inlet)
|
||||
b.addEdge(shape.out, f.in)
|
||||
b.addEdge(f.out, shape.in)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -978,23 +978,23 @@ object GraphDSL extends GraphApply {
|
|||
def <~>[I2, O2, Mat](bidi: Graph[BidiShape[O, O2, I2, I], Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
|
||||
val shape = b.add(bidi)
|
||||
val flow = b.add(f)
|
||||
b.addEdge(flow.outlet, shape.in1)
|
||||
b.addEdge(shape.out2, flow.inlet)
|
||||
b.addEdge(flow.out, shape.in1)
|
||||
b.addEdge(shape.out2, flow.in)
|
||||
shape
|
||||
}
|
||||
|
||||
def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
|
||||
val flow = b.add(f)
|
||||
b.addEdge(flow.outlet, bidi.in1)
|
||||
b.addEdge(bidi.out2, flow.inlet)
|
||||
b.addEdge(flow.out, bidi.in1)
|
||||
b.addEdge(bidi.out2, flow.in)
|
||||
bidi
|
||||
}
|
||||
|
||||
def <~>[M2](flow: Graph[FlowShape[O, I], M2])(implicit b: Builder[_]): Unit = {
|
||||
val shape = b.add(flow)
|
||||
val ff = b.add(f)
|
||||
b.addEdge(shape.outlet, ff.inlet)
|
||||
b.addEdge(ff.outlet, shape.inlet)
|
||||
b.addEdge(shape.out, ff.in)
|
||||
b.addEdge(ff.out, shape.in)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1013,14 +1013,14 @@ object GraphDSL extends GraphApply {
|
|||
}
|
||||
|
||||
def <~>(flow: FlowShape[O1, I2])(implicit b: Builder[_]): Unit = {
|
||||
b.addEdge(bidi.out1, flow.inlet)
|
||||
b.addEdge(flow.outlet, bidi.in2)
|
||||
b.addEdge(bidi.out1, flow.in)
|
||||
b.addEdge(flow.out, bidi.in2)
|
||||
}
|
||||
|
||||
def <~>[M](f: Graph[FlowShape[O1, I2], M])(implicit b: Builder[_]): Unit = {
|
||||
val flow = b.add(f)
|
||||
b.addEdge(bidi.out1, flow.inlet)
|
||||
b.addEdge(flow.outlet, bidi.in2)
|
||||
b.addEdge(bidi.out1, flow.in)
|
||||
b.addEdge(flow.out, bidi.in2)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1033,14 +1033,14 @@ object GraphDSL extends GraphApply {
|
|||
new PortOpsImpl(findOut(b, j, 0), b)
|
||||
|
||||
implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder[_]): PortOps[O] =
|
||||
new PortOpsImpl(f.outlet, b)
|
||||
new PortOpsImpl(f.out, b)
|
||||
|
||||
implicit final class SourceArrow[T](val s: Graph[SourceShape[T], _]) extends AnyVal with CombinerBase[T] {
|
||||
override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s).outlet
|
||||
override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s).out
|
||||
}
|
||||
|
||||
implicit final class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] {
|
||||
override def importAndGetPort(b: Builder[_]): Outlet[T] = s.outlet
|
||||
override def importAndGetPort(b: Builder[_]): Outlet[T] = s.out
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
|
|||
val flowCopy = flow.module.carbonCopy
|
||||
new Source(
|
||||
module
|
||||
.fuse(flowCopy, shape.outlet, flowCopy.shape.inlets.head, combine)
|
||||
.fuse(flowCopy, shape.out, flowCopy.shape.inlets.head, combine)
|
||||
.replaceShape(SourceShape(flowCopy.shape.outlets.head)))
|
||||
}
|
||||
}
|
||||
|
|
@ -65,7 +65,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
|
|||
*/
|
||||
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = {
|
||||
val sinkCopy = sink.module.carbonCopy
|
||||
RunnableGraph(module.fuse(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine))
|
||||
RunnableGraph(module.fuse(sinkCopy, shape.out, sinkCopy.shape.inlets.head, combine))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -79,7 +79,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
|
|||
// No need to copy here, op is a fresh instance
|
||||
new Source(
|
||||
module
|
||||
.fuse(op, shape.outlet, op.inPort)
|
||||
.fuse(op, shape.out, op.inPort)
|
||||
.replaceShape(SourceShape(op.outPort)))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ private[stream] object AbstractStage {
|
|||
// No need to refer to the handle in a private val
|
||||
val handler = new InHandler with OutHandler {
|
||||
override def onPush(): Unit =
|
||||
try { currentStage.onPush(grab(shape.inlet), ctx) } catch { case NonFatal(ex) ⇒ onSupervision(ex) }
|
||||
try { currentStage.onPush(grab(shape.in), ctx) } catch { case NonFatal(ex) ⇒ onSupervision(ex) }
|
||||
|
||||
override def onPull(): Unit = currentStage.onPull(ctx)
|
||||
|
||||
|
|
@ -62,8 +62,8 @@ private[stream] object AbstractStage {
|
|||
override def onDownstreamFinish(): Unit = currentStage.onDownstreamFinish(ctx)
|
||||
}
|
||||
|
||||
setHandler(shape.inlet, handler)
|
||||
setHandler(shape.outlet, handler)
|
||||
setHandler(shape.in, handler)
|
||||
setHandler(shape.out, handler)
|
||||
}
|
||||
|
||||
private def onSupervision(ex: Throwable): Unit = {
|
||||
|
|
@ -81,22 +81,22 @@ private[stream] object AbstractStage {
|
|||
}
|
||||
|
||||
private def resetAfterSupervise(): Unit = {
|
||||
val mustPull = currentStage.isDetached || isAvailable(shape.outlet)
|
||||
if (!hasBeenPulled(shape.inlet) && mustPull) pull(shape.inlet)
|
||||
val mustPull = currentStage.isDetached || isAvailable(shape.out)
|
||||
if (!hasBeenPulled(shape.in) && mustPull) pull(shape.in)
|
||||
}
|
||||
|
||||
override protected[stream] def beforePreStart(): Unit = {
|
||||
super.beforePreStart()
|
||||
if (currentStage.isDetached) pull(shape.inlet)
|
||||
if (currentStage.isDetached) pull(shape.in)
|
||||
}
|
||||
|
||||
final override def push(elem: Out): DownstreamDirective = {
|
||||
push(shape.outlet, elem)
|
||||
push(shape.out, elem)
|
||||
null
|
||||
}
|
||||
|
||||
final override def pull(): UpstreamDirective = {
|
||||
pull(shape.inlet)
|
||||
pull(shape.in)
|
||||
null
|
||||
}
|
||||
|
||||
|
|
@ -106,7 +106,7 @@ private[stream] object AbstractStage {
|
|||
}
|
||||
|
||||
final override def pushAndFinish(elem: Out): DownstreamDirective = {
|
||||
push(shape.outlet, elem)
|
||||
push(shape.out, elem)
|
||||
completeStage()
|
||||
null
|
||||
}
|
||||
|
|
@ -116,10 +116,10 @@ private[stream] object AbstractStage {
|
|||
null
|
||||
}
|
||||
|
||||
final override def isFinishing: Boolean = isClosed(shape.inlet)
|
||||
final override def isFinishing: Boolean = isClosed(shape.in)
|
||||
|
||||
final override def absorbTermination(): TerminationDirective = {
|
||||
if (isClosed(shape.outlet)) {
|
||||
if (isClosed(shape.out)) {
|
||||
val ex = new UnsupportedOperationException("It is not allowed to call absorbTermination() from onDownstreamFinish.")
|
||||
// This MUST be logged here, since the downstream has cancelled, i.e. there is noone to send onError to, the
|
||||
// stage is just about to finish so noone will catch it anyway just the interpreter
|
||||
|
|
@ -127,29 +127,29 @@ private[stream] object AbstractStage {
|
|||
interpreter.log.error(ex.getMessage)
|
||||
throw ex // We still throw for correctness (although a finish() would also work here)
|
||||
}
|
||||
if (isAvailable(shape.outlet)) currentStage.onPull(ctx)
|
||||
if (isAvailable(shape.out)) currentStage.onPull(ctx)
|
||||
null
|
||||
}
|
||||
|
||||
override def pushAndPull(elem: Out): FreeDirective = {
|
||||
push(shape.outlet, elem)
|
||||
pull(shape.inlet)
|
||||
push(shape.out, elem)
|
||||
pull(shape.in)
|
||||
null
|
||||
}
|
||||
|
||||
final override def holdUpstreamAndPush(elem: Out): UpstreamDirective = {
|
||||
push(shape.outlet, elem)
|
||||
push(shape.out, elem)
|
||||
null
|
||||
}
|
||||
|
||||
final override def holdDownstreamAndPull(): DownstreamDirective = {
|
||||
pull(shape.inlet)
|
||||
pull(shape.in)
|
||||
null
|
||||
}
|
||||
|
||||
final override def isHoldingDownstream: Boolean = isAvailable(shape.outlet)
|
||||
final override def isHoldingDownstream: Boolean = isAvailable(shape.out)
|
||||
|
||||
final override def isHoldingUpstream: Boolean = !(isClosed(shape.inlet) || hasBeenPulled(shape.inlet))
|
||||
final override def isHoldingUpstream: Boolean = !(isClosed(shape.in) || hasBeenPulled(shape.in))
|
||||
|
||||
final override def holdDownstream(): DownstreamDirective = null
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue