diff --git a/akka-docs-dev/rst/java/stream-flows-and-basics.rst b/akka-docs-dev/rst/java/stream-flows-and-basics.rst index a66eefb625..2e8f4db9e2 100644 --- a/akka-docs-dev/rst/java/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/java/stream-flows-and-basics.rst @@ -206,6 +206,9 @@ resulting values. Some examples of using these combiners are illustrated in the .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-mat-combine +.. note:: + In Graphs it is possible to access the materialized value from inside the stream processing graph. For details see + :ref:`graph-matvalue-java` Stream ordering =============== diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst index 5fbb7716e1..8acea08be4 100644 --- a/akka-docs-dev/rst/java/stream-graphs.rst +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -194,6 +194,24 @@ together and also turned around with the ``.reversed()`` method. The test simulates both parties of a network communication protocol without actually having to open a network connection—the flows can just be connected directly. +.. _graph-matvalue-java: + +Accessing the materialized value inside the Graph +------------------------------------------------- + +In certain cases it might be necessary to feed back the materialized value of a Graph (partial, closed or backing a +Source, Sink, Flow or BidiFlow). This is possible by using ``builder.matValue`` which gives an ``Outlet`` that +can be used in the graph as an ordinary source or outlet, and which will eventually emit the materialized value. +If the materialized value is needed at more than one place, it is possible to call ``matValue`` any number of times +to acquire the necessary number of outlets. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowGraphDocTest.java#flow-graph-matvalue + +Be careful not to introduce a cycle where the materialized value actually contributes to the materialized value. +The following example demonstrates a case where the materialized ``Future`` of a fold is fed back to the fold itself. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowGraphDocTest.java#flow-graph-matvalue-cycle + .. _graph-cycles-java: Graph cycles, liveness and deadlocks diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index f0e8ec0651..b5dbf068a3 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -8,7 +8,7 @@ import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec import scala.collection.immutable -import scala.concurrent.Await +import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ class FlowGraphDocSpec extends AkkaSpec { @@ -20,7 +20,7 @@ class FlowGraphDocSpec extends AkkaSpec { "build simple graph" in { //format: OFF //#simple-flow-graph - val g = FlowGraph.closed() { implicit builder: FlowGraph.Builder => + val g = FlowGraph.closed() { implicit builder: FlowGraph.Builder[Unit] => import FlowGraph.Implicits._ val in = Source(1 to 10) val out = Sink.ignore @@ -43,7 +43,7 @@ class FlowGraphDocSpec extends AkkaSpec { "build simple graph without implicits" in { //#simple-flow-graph-no-implicits - val g = FlowGraph.closed() { builder: FlowGraph.Builder => + val g = FlowGraph.closed() { builder: FlowGraph.Builder[Unit] => val in = Source(1 to 10) val out = Sink.ignore @@ -219,4 +219,33 @@ class FlowGraphDocSpec extends AkkaSpec { } + "access to materialized value" in { + //#flow-graph-matvalue + import FlowGraph.Implicits._ + val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) { + implicit builder ⇒ + fold ⇒ + (fold.inlet, builder.matValue.mapAsync(identity).outlet) + } + //#flow-graph-matvalue + + Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55) + + //#flow-graph-matvalue-cycle + import FlowGraph.Implicits._ + // This cannot produce any value: + val cyclicFold: Source[Int, Future[Int]] = Source(Sink.fold[Int, Int](0)(_ + _)) { + implicit builder => + fold => + // - Fold cannot complete until its upstream mapAsync completes + // - mapAsync cannot complete until the materialized Future produced by + // fold completes + // As a result this Source will never emit anything, and its materialited + // Future will never complete + builder.matValue.mapAsync(identity) ~> fold + builder.matValue.mapAsync(identity).outlet + } + //#flow-graph-matvalue-cycle + } + } diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index eec1a26ea7..b604e8f4a5 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -210,6 +210,10 @@ resulting values. Some examples of using these combiners are illustrated in the .. includecode:: code/docs/stream/FlowDocSpec.scala#flow-mat-combine +.. note:: + In Graphs it is possible to access the materialized value from inside the stream processing graph. For details see + :ref:`graph-matvalue-scala` + Stream ordering =============== In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs ``{IA1,IA2,...,IAn}`` diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index 9c8d10bd89..3952dd6c0d 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -248,6 +248,24 @@ together and also turned around with the ``.reversed`` method. The test simulates both parties of a network communication protocol without actually having to open a network connection—the flows can just be connected directly. +.. _graph-matvalue-scala: + +Accessing the materialized value inside the Graph +------------------------------------------------- + +In certain cases it might be necessary to feed back the materialized value of a Graph (partial, closed or backing a +Source, Sink, Flow or BidiFlow). This is possible by using ``builder.matValue`` which gives an ``Outlet`` that +can be used in the graph as an ordinary source or outlet, and which will eventually emit the materialized value. +If the materialized value is needed at more than one place, it is possible to call ``matValue`` any number of times +to acquire the necessary number of outlets. + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-matvalue + +Be careful not to introduce a cycle where the materialized value actually contributes to the materialized value. +The following example demonstrates a case where the materialized ``Future`` of a fold is fed back to the fold itself. + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-matvalue-cycle + .. _graph-cycles-scala: Graph cycles, liveness and deadlocks diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MaterializedValuePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MaterializedValuePublisherTest.scala new file mode 100644 index 0000000000..2c6181d7b6 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MaterializedValuePublisherTest.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.impl.MaterializedValuePublisher +import org.reactivestreams.Publisher + +class MaterializedValuePublisherTest extends AkkaPublisherVerification[Any] { + + override def createPublisher(elements: Long): Publisher[Any] = { + val pub = new MaterializedValuePublisher() + + // it contains a value already + pub.setValue("Hello") + + pub + } + + override def maxElementsFromPublisher = 1 +} \ No newline at end of file diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala index 9fad368a6a..a5f7d419f1 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala @@ -17,13 +17,13 @@ abstract class TwoStreamsSetup extends AkkaSpec { type Outputs - abstract class Fixture(b: FlowGraph.Builder) { + abstract class Fixture(b: FlowGraph.Builder[_]) { def left: Inlet[Int] def right: Inlet[Int] def out: Outlet[Outputs] } - def fixture(b: FlowGraph.Builder): Fixture + def fixture(b: FlowGraph.Builder[_]): Fixture def setup(p1: Publisher[Int], p2: Publisher[Int]) = { val subscriber = StreamTestKit.SubscriberProbe[Outputs]() diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index a762b46fb8..57630b0a09 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -37,9 +37,9 @@ public class BidiFlowTest extends StreamTest { private final BidiFlow bidi = BidiFlow .factory() .create( - new Function>() { + new Function, BidiShape>() { @Override - public BidiShape apply(Builder b) + public BidiShape apply(Builder b) throws Exception { final FlowShape top = b.graph(Flow . empty().map(new Function() { @@ -63,9 +63,9 @@ public class BidiFlowTest extends StreamTest { private final BidiFlow inverse = BidiFlow .factory() .create( - new Function>() { + new Function, BidiShape>() { @Override - public BidiShape apply(Builder b) + public BidiShape apply(Builder b) throws Exception { final FlowShape top = b.graph(Flow. empty() .map(new Function() { @@ -90,9 +90,9 @@ public class BidiFlowTest extends StreamTest { .factory() .create( Sink. head(), - new Function2, BidiShape>() { + new Function2>, SinkShape, BidiShape>() { @Override - public BidiShape apply(Builder b, SinkShape sink) + public BidiShape apply(Builder> b, SinkShape sink) throws Exception { b.from(Source.single(42)).to(sink); final FlowShape top = b.graph(Flow @@ -130,9 +130,9 @@ public class BidiFlowTest extends StreamTest { .factory() .closed(Sink. head(), Sink. head(), Keep., Future> both(), - new Procedure3, SinkShape>() { + new Procedure3, Future>>, SinkShape, SinkShape>() { @Override - public void apply(Builder b, SinkShape st, + public void apply(Builder, Future>> b, SinkShape st, SinkShape sb) throws Exception { final BidiShape s = b .graph(bidi); @@ -198,12 +198,12 @@ public class BidiFlowTest extends StreamTest { final Future> result = Source.from(list).via(f).grouped(10).runWith(Sink.> head(), materializer); assertEquals(Arrays.asList("5", "6", "7"), Await.result(result, oneSec)); } - + @Test public void mustMaterializeToItsValue() throws Exception { - final Future f = FlowGraph.factory().closed(bidiMat, new Procedure2>() { + final Future f = FlowGraph.factory().closed(bidiMat, new Procedure2 >, BidiShape>() { @Override - public void apply(Builder b, + public void apply(Builder> b, BidiShape shape) throws Exception { final FlowShape left = b.graph(Flow. empty().map( new Function() { @@ -223,13 +223,13 @@ public class BidiFlowTest extends StreamTest { }).run(materializer); assertEquals((Integer) 42, Await.result(f, oneSec)); } - + @Test public void mustCombineMaterializationValues() throws Exception { final Flow> left = Flow.factory().create( - Sink. head(), new Function2, Pair, Outlet>>() { + Sink. head(), new Function2 >, SinkShape, Pair, Outlet>>() { @Override - public Pair, Outlet> apply(Builder b, + public Pair, Outlet> apply(Builder> b, SinkShape sink) throws Exception { final UniformFanOutShape bcast = b.graph(Broadcast. create(2)); final UniformFanInShape merge = b.graph(Merge. create(2)); @@ -247,9 +247,9 @@ public class BidiFlowTest extends StreamTest { } }); final Flow>> right = Flow.factory().create( - Sink.> head(), new Function2>, Pair, Outlet>>() { + Sink.> head(), new Function2>>, SinkShape>, Pair, Outlet>>() { @Override - public Pair, Outlet> apply(Builder b, + public Pair, Outlet> apply(Builder>> b, SinkShape> sink) throws Exception { final FlowShape> flow = b.graph(Flow. empty().grouped(10)); b.from(flow).to(sink); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java index 104d7ce543..33cd41f2e4 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java @@ -47,9 +47,9 @@ public class FlexiMergeTest { final Future> all = FlowGraph .factory() .closed(Sink.> head(), - new Procedure2>>() { + new Procedure2> >, SinkShape>>() { @Override - public void apply(Builder b, SinkShape> sink) + public void apply(Builder> > b, SinkShape> sink) throws Exception { final UniformFanInShape merge = b.graph(new Fair()); b.edge(b.source(in1), merge.in(0)); @@ -69,9 +69,9 @@ public class FlexiMergeTest { final Future> all = FlowGraph .factory() .closed(Sink.> head(), - new Procedure2>>() { + new Procedure2>>, SinkShape>>() { @Override - public void apply(Builder b, SinkShape> sink) + public void apply(Builder>> b, SinkShape> sink) throws Exception { final UniformFanInShape merge = b.graph(new StrictRoundRobin()); b.edge(b.source(in1), merge.in(0)); @@ -93,9 +93,9 @@ public class FlexiMergeTest { final Future>> all = FlowGraph .factory() .closed(Sink.>>head(), - new Procedure2>>>() { + new Procedure2>>>, SinkShape>>>() { @Override - public void apply(Builder b, SinkShape>> sink) + public void apply(Builder>>> b, SinkShape>> sink) throws Exception { final FanInShape2> zip = b.graph(new Zip()); b.edge(b.source(inA), zip.in0()); @@ -120,9 +120,9 @@ public class FlexiMergeTest { final Future>> all = FlowGraph .factory() .closed(Sink.>> head(), - new Procedure2>>>() { + new Procedure2>>>, SinkShape>>>() { @Override - public void apply(Builder b, SinkShape>> sink) + public void apply(Builder>>> b, SinkShape>> sink) throws Exception { final FanInShape3> zip = b.graph(new TripleZip()); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java index 85f78bf361..83a5dfb41b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java @@ -50,9 +50,9 @@ public class FlexiRouteTest { out1, out2, Keep.>, Future>> both(), - new Procedure3>, SinkShape>>() { + new Procedure3>, Future>>>, SinkShape>, SinkShape>>() { @Override - public void apply(Builder b, SinkShape> o1, + public void apply(Builder>, Future>>> b, SinkShape> o1, SinkShape> o2) throws Exception { final UniformFanOutShape fair = b.graph(new Fair()); b.edge(b.source(in), fair.in()); @@ -80,9 +80,9 @@ public class FlexiRouteTest { out1, out2, Keep.>, Future>> both(), - new Procedure3>, SinkShape>>() { + new Procedure3>, Future>>>, SinkShape>, SinkShape>>() { @Override - public void apply(Builder b, SinkShape> o1, + public void apply(Builder>, Future>>> b, SinkShape> o1, SinkShape> o2) throws Exception { final UniformFanOutShape robin = b.graph(new StrictRoundRobin()); b.edge(b.source(in), robin.in()); @@ -112,9 +112,9 @@ public class FlexiRouteTest { Sink.> head(), out2, Keep.>, Future>> both(), - new Procedure3>, SinkShape>>() { + new Procedure3>, Future>> >, SinkShape>, SinkShape>>() { @Override - public void apply(Builder b, SinkShape> o1, + public void apply(Builder>, Future>> > b, SinkShape> o1, SinkShape> o2) throws Exception { final FanOutShape2, Integer, String> unzip = b.graph(new Unzip()); final Outlet> src = b.source(Source.from(pairs)); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index a9219af409..83b4c62d8c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -16,6 +16,7 @@ import akka.stream.javadsl.japi.*; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -100,9 +101,9 @@ public class FlowGraphTest extends StreamTest { final Sink> publisher = Sink.publisher(); - final Source source = Source.factory().create(new Function>() { + final Source source = Source.factory().create(new Function, Outlet>() { @Override - public Outlet apply(Builder b) throws Exception { + public Outlet apply(Builder b) throws Exception { final UniformFanInShape merge = b.graph(Merge. create(2)); b.flow(b.source(in1), f1, merge.in(0)); b.flow(b.source(in2), f2, merge.in(1)); @@ -124,10 +125,10 @@ public class FlowGraphTest extends StreamTest { final Iterable input1 = Arrays.asList("A", "B", "C"); final Iterable input2 = Arrays.asList(1, 2, 3); - final Builder b = FlowGraph.builder(); + final Builder b = FlowGraph.builder(); final Source in1 = Source.from(input1); final Source in2 = Source.from(input2); - final FanInShape2> zip = b.graph(Zip. create()); + final FanInShape2> zip = b.graph(Zip.create()); final Sink, Future> out = Sink .foreach(new Procedure>() { @Override @@ -144,7 +145,7 @@ public class FlowGraphTest extends StreamTest { List output = Arrays.asList(probe.receiveN(3)); @SuppressWarnings("unchecked") List> expected = Arrays.asList(new Pair("A", 1), new Pair( - "B", 2), new Pair("C", 3)); + "B", 2), new Pair("C", 3)); assertEquals(expected, output); } @@ -160,9 +161,9 @@ public class FlowGraphTest extends StreamTest { final Iterable expected1 = Arrays.asList("A", "B", "C"); final Iterable expected2 = Arrays.asList(1, 2, 3); - final Builder b = FlowGraph.builder(); + final Builder b = FlowGraph.builder(); final Outlet> in = b.source(Source.from(input)); - final FanOutShape2, String, Integer> unzip = b.graph(Unzip. create()); + final FanOutShape2, String, Integer> unzip = b.graph(Unzip.create()); final Sink> out1 = Sink.foreach(new Procedure() { @Override @@ -200,9 +201,9 @@ public class FlowGraphTest extends StreamTest { } }); - final Future future = FlowGraph.factory().closed(Sink. head(), new Procedure2>() { + final Future future = FlowGraph.factory().closed(Sink. head(), new Procedure2 >, SinkShape>() { @Override - public void apply(Builder b, SinkShape out) throws Exception { + public void apply(Builder > b, SinkShape out) throws Exception { final FanInShape2 zip = b.graph(sumZip); b.edge(b.source(in1), zip.in0()); b.edge(b.source(in2), zip.in1()); @@ -215,22 +216,22 @@ public class FlowGraphTest extends StreamTest { } @Test - public void mustBeAbleToUseZip4With() throws Exception { + public void mustBeAbleToUseZip4With() throws Exception { final Source in1 = Source.single(1); final Source in2 = Source.single(10); final Source in3 = Source.single(100); final Source in4 = Source.single(1000); final Graph, BoxedUnit> sumZip = ZipWith.create( - new Function4() { - @Override public Integer apply(Integer i1, Integer i2, Integer i3, Integer i4) throws Exception { - return i1 + i2 + i3 + i4; - } - }); - - final Future future = FlowGraph.factory().closed(Sink. head(), new Procedure2>() { + new Function4() { + @Override public Integer apply(Integer i1, Integer i2, Integer i3, Integer i4) throws Exception { + return i1 + i2 + i3 + i4; + } + }); + + final Future future = FlowGraph.factory().closed(Sink. head(), new Procedure2>, SinkShape>() { @Override - public void apply(Builder b, SinkShape out) throws Exception { + public void apply(Builder> b, SinkShape out) throws Exception { final FanInShape4 zip = b.graph(sumZip); b.edge(b.source(in1), zip.in0()); b.edge(b.source(in2), zip.in1()); @@ -244,4 +245,30 @@ public class FlowGraphTest extends StreamTest { assertEquals(1111, (int) result); } + @Test + public void mustBeAbleToUseMatValue() throws Exception { + final Source in1 = Source.single(1); + final TestProbe probe = TestProbe.apply(system); + + final Future future = FlowGraph.factory().closed(Sink. head(), new Procedure2>, SinkShape>() { + @Override + public void apply(Builder> b, SinkShape out) throws Exception { + b.from(Source.single(1)).to(out); + b.from(b.matValue()).to(Sink.foreach(new Procedure>(){ + public void apply(Future mat) throws Exception { + probe.ref().tell(mat, ActorRef.noSender()); + } + })); + } + }).run(materializer); + + final Integer result = Await.result(future, Duration.create(300, TimeUnit.MILLISECONDS)); + assertEquals(1, (int) result); + + final Future future2 = probe.expectMsgClass(Future.class); + + final Integer result2 = Await.result(future2, Duration.create(300, TimeUnit.MILLISECONDS)); + assertEquals(1, (int) result2); + } + } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index b5e87addd0..eaa48161dc 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -280,9 +280,9 @@ public class FlowTest extends StreamTest { final Sink> publisher = Sink.publisher(); - final Source source = Source.factory().create(new Function>() { + final Source source = Source.factory().create(new Function, Outlet>() { @Override - public Outlet apply(Builder b) throws Exception { + public Outlet apply(Builder b) throws Exception { final UniformFanInShape merge = b.graph(Merge. create(2)); b.flow(b.source(in1), f1, merge.in(0)); b.flow(b.source(in2), f2, merge.in(1)); @@ -304,7 +304,7 @@ public class FlowTest extends StreamTest { final Iterable input1 = Arrays.asList("A", "B", "C"); final Iterable input2 = Arrays.asList(1, 2, 3); - final Builder b = FlowGraph.builder(); + final Builder b = FlowGraph.builder(); final Outlet in1 = b.source(Source.from(input1)); final Outlet in2 = b.source(Source.from(input2)); final FanInShape2> zip = b.graph(Zip. create()); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index 254fb72c59..5f0d2add03 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -14,7 +14,7 @@ class GraphConcatSpec extends TwoStreamsSetup { override type Outputs = Int - override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { + override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) { val concat = b add Concat[Outputs]() override def left: Inlet[Outputs] = concat.in(0) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala new file mode 100644 index 0000000000..397dd6f814 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit + +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ + +class GraphMatValueSpec extends AkkaSpec { + + val settings = ActorFlowMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val materializer = ActorFlowMaterializer(settings) + + import FlowGraph.Implicits._ + + "A Graph with materialized value" must { + + val foldSink = Sink.fold[Int, Int](0)(_ + _) + + "expose the materialized value as source" in { + val sub = StreamTestKit.SubscriberProbe[Int]() + val f = FlowGraph.closed(foldSink) { implicit b ⇒ + fold ⇒ + Source(1 to 10) ~> fold + b.matValue.mapAsync(identity) ~> Sink(sub) + }.run() + + val r1 = Await.result(f, 3.seconds) + sub.expectSubscription().request(1) + val r2 = sub.expectNext() + + r1 should ===(r2) + } + + "expose the materialized value as source multiple times" in { + val sub = StreamTestKit.SubscriberProbe[Int]() + + val f = FlowGraph.closed(foldSink) { implicit b ⇒ + fold ⇒ + val zip = b.add(ZipWith[Int, Int, Int](_ + _)) + Source(1 to 10) ~> fold + b.matValue.mapAsync(identity) ~> zip.in0 + b.matValue.mapAsync(identity) ~> zip.in1 + + zip.out ~> Sink(sub) + }.run() + + val r1 = Await.result(f, 3.seconds) + sub.expectSubscription().request(1) + val r2 = sub.expectNext() + + r1 should ===(r2 / 2) + } + + // Exposes the materialized value as a stream value + val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source(foldSink) { implicit b ⇒ + fold ⇒ + Source(1 to 10) ~> fold + b.matValue + } + + "allow exposing the materialized value as port" in { + val (f1, f2) = foldFeedbackSource.mapAsync(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run() + Await.result(f1, 3.seconds) should ===(55) + Await.result(f2, 3.seconds) should ===(155) + } + + "allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in { + val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(identity).map(_ + 100).mapMaterialized((_) ⇒ ()) + Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155) + } + + "work properly with nesting and reusing" in { + val compositeSource1 = Source(foldFeedbackSource, foldFeedbackSource)(Keep.both) { implicit b ⇒ + (s1, s2) ⇒ + val zip = b.add(ZipWith[Int, Int, Int](_ + _)) + + s1.outlet.mapAsync(identity) ~> zip.in0 + s2.outlet.mapAsync(identity).map(_ * 100) ~> zip.in1 + zip.out + } + + val compositeSource2 = Source(compositeSource1, compositeSource1)(Keep.both) { implicit b ⇒ + (s1, s2) ⇒ + val zip = b.add(ZipWith[Int, Int, Int](_ + _)) + + s1.outlet ~> zip.in0 + s2.outlet.map(_ * 10000) ~> zip.in1 + zip.out + } + + val (((f1, f2), (f3, f4)), result) = compositeSource2.toMat(Sink.head)(Keep.both).run() + + Await.result(result, 3.seconds) should ===(55555555) + Await.result(f1, 3.seconds) should ===(55) + Await.result(f2, 3.seconds) should ===(55) + Await.result(f3, 3.seconds) should ===(55) + Await.result(f4, 3.seconds) should ===(55) + + } + + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala index 00aba2a197..2c26920857 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala @@ -14,7 +14,7 @@ class GraphMergeSpec extends TwoStreamsSetup { override type Outputs = Int - override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { + override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) { val merge = b add Merge[Outputs](2) override def left: Inlet[Outputs] = merge.in(0) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala index 3866592d64..61227bfae3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPreferredMergeSpec.scala @@ -14,7 +14,7 @@ class GraphPreferredMergeSpec extends TwoStreamsSetup { override type Outputs = Int - override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { + override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) { val merge = b.add(MergePreferred[Outputs](1)) override def left: Inlet[Outputs] = merge.preferred diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala index 27acd20a19..8c997fadda 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala @@ -12,7 +12,7 @@ class GraphZipSpec extends TwoStreamsSetup { override type Outputs = (Int, Int) - override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { + override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) { val zip = b.add(Zip[Int, Int]()) override def left: Inlet[Int] = zip.in0 diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala index 1759030e44..08c3f7441b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala @@ -10,7 +10,7 @@ class GraphZipWithSpec extends TwoStreamsSetup { override type Outputs = Int - override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { + override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) { val zip = b.add(ZipWith((_: Int) + (_: Int))) override def left: Inlet[Int] = zip.in0 override def right: Inlet[Int] = zip.in1 diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template index 0099ddb96a..eb8641d4a8 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/BidiFlowCreate.scala.template @@ -13,14 +13,14 @@ trait BidiFlowCreate { import language.implicitConversions private implicit def p[A, B](pair: Pair[A, B]): (A, B) = pair.first -> pair.second - def create[I1, O1, I2, O2](block: japi.Function[FlowGraph.Builder, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] = + def create[I1, O1, I2, O2](block: japi.Function[FlowGraph.Builder[Unit], BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] = new BidiFlow(scaladsl.BidiFlow() { b ⇒ block.apply(b.asJava) }) - def create[I1, O1, I2, O2, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] = + def create[I1, O1, I2, O2, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] = new BidiFlow(scaladsl.BidiFlow(g1) { b ⇒ s => block.apply(b.asJava, s) }) [2..21#def create[I##1, O##1, I##2, O##2, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], - block: japi.Function2[FlowGraph.Builder, [#S1#], BidiShape[I##1, O##1, I##2, O##2]]): BidiFlow[I##1, O##1, I##2, O##2, M] = + block: japi.Function2[FlowGraph.Builder[M], [#S1#], BidiShape[I##1, O##1, I##2, O##2]]): BidiFlow[I##1, O##1, I##2, O##2, M] = new BidiFlow(scaladsl.BidiFlow([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# ] diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template index 3b7b555f05..64ca7ddd32 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/FlowCreate.scala.template @@ -13,14 +13,14 @@ trait FlowCreate { import language.implicitConversions private implicit def p[A, B](pair: Pair[A, B]): (A, B) = pair.first -> pair.second - def create[I, O](block: japi.Function[FlowGraph.Builder, Inlet[I] Pair Outlet[O]]): Flow[I, O, Unit] = + def create[I, O](block: japi.Function[FlowGraph.Builder[Unit], Inlet[I] Pair Outlet[O]]): Flow[I, O, Unit] = new Flow(scaladsl.Flow() { b ⇒ block.apply(b.asJava) }) - def create[I, O, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = + def create[I, O, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = new Flow(scaladsl.Flow(g1) { b ⇒ s => block.apply(b.asJava, s) }) [2..21#def create[I, O, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], - block: japi.Function2[FlowGraph.Builder, [#S1#], Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = + block: japi.Function2[FlowGraph.Builder[M], [#S1#], Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = new Flow(scaladsl.Flow([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# ] diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template index 1a3ff144d1..37cf4a82b1 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template @@ -12,24 +12,24 @@ trait GraphCreate { import language.implicitConversions private implicit def r[M](run: scaladsl.RunnableFlow[M]): RunnableFlow[M] = new RunnableFlowAdapter(run) - def closed(block: japi.Procedure[FlowGraph.Builder]): RunnableFlow[Unit] = + def closed(block: japi.Procedure[FlowGraph.Builder[Unit]]): RunnableFlow[Unit] = scaladsl.FlowGraph.closed() { b ⇒ block.apply(b.asJava) } - def partial[S <: Shape](block: japi.Function[FlowGraph.Builder, S]): Graph[S, Unit] = + def partial[S <: Shape](block: japi.Function[FlowGraph.Builder[Unit], S]): Graph[S, Unit] = scaladsl.FlowGraph.partial() { b ⇒ block.apply(b.asJava) } - def closed[S1 <: Shape, M](g1: Graph[S1, M], block: japi.Procedure2[FlowGraph.Builder, S1]): RunnableFlow[M] = + def closed[S1 <: Shape, M](g1: Graph[S1, M], block: japi.Procedure2[FlowGraph.Builder[M], S1]): RunnableFlow[M] = scaladsl.FlowGraph.closed(g1) { b ⇒ s => block.apply(b.asJava, s) } - def partial[S1 <: Shape, S <: Shape, M](g1: Graph[S1, M], block: japi.Function2[FlowGraph.Builder, S1, S]): Graph[S, M] = + def partial[S1 <: Shape, S <: Shape, M](g1: Graph[S1, M], block: japi.Function2[FlowGraph.Builder[M], S1, S]): Graph[S, M] = scaladsl.FlowGraph.partial(g1) { b ⇒ s => block.apply(b.asJava, s) } [2..21#def closed[[#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], - block: japi.Procedure2[FlowGraph.Builder, [#S1#]]): RunnableFlow[M] = + block: japi.Procedure2[FlowGraph.Builder[M], [#S1#]]): RunnableFlow[M] = scaladsl.FlowGraph.closed([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) } def partial[[#S1 <: Shape#], S <: Shape, [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], - block: japi.Function2[FlowGraph.Builder, [#S1#], S]): Graph[S, M] = + block: japi.Function2[FlowGraph.Builder[M], [#S1#], S]): Graph[S, M] = scaladsl.FlowGraph.partial([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }# ] diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template index d29b28ced1..fa33ff0a54 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SinkCreate.scala.template @@ -13,18 +13,18 @@ trait SinkCreate { * Creates a `Sink` by using a `FlowGraph.Builder` on a block that expects * a [[FlowGraph.Builder]] and returns the `UndefinedSource`. */ - def create[T](block: japi.Function[FlowGraph.Builder, Inlet[T]]): Sink[T, Unit] = + def create[T](block: japi.Function[FlowGraph.Builder[Unit], Inlet[T]]): Sink[T, Unit] = new Sink(scaladsl.Sink() { b ⇒ block.apply(b.asJava) }) /** * Creates a `Sink` by using a `FlowGraph.Builder` on a block that expects * a [[FlowGraph.Builder]] and returns the `UndefinedSource`. */ - def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Inlet[T]]): Sink[T, M] = + def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, Inlet[T]]): Sink[T, M] = new Sink(scaladsl.Sink(g1) { b ⇒ s => block.apply(b.asJava, s) }) [2..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], - block: japi.Function2[FlowGraph.Builder, [#S1#], Inlet[T]]): Sink[T, M] = + block: japi.Function2[FlowGraph.Builder[M], [#S1#], Inlet[T]]): Sink[T, M] = new Sink(scaladsl.Sink([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# ] diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template index b000d1b868..e12a470106 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/SourceCreate.scala.template @@ -9,14 +9,14 @@ import akka.stream.scaladsl.JavaConverters._ trait SourceCreate { - def create[T](block: japi.Function[FlowGraph.Builder, Outlet[T]]): Source[T, Unit] = + def create[T](block: japi.Function[FlowGraph.Builder[Unit], Outlet[T]]): Source[T, Unit] = new Source(scaladsl.Source() { b ⇒ block.apply(b.asJava) }) - def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Outlet[T]]): Source[T, M] = + def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, Outlet[T]]): Source[T, M] = new Source(scaladsl.Source(g1) { b ⇒ s => block.apply(b.asJava, s) }) [2..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], - block: japi.Function2[FlowGraph.Builder, [#S1#], Outlet[T]]): Source[T, M] = + block: japi.Function2[FlowGraph.Builder[M], [#S1#], Outlet[T]]): Source[T, M] = new Source(scaladsl.Source([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# ] diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template index 9cfbb815ea..7321876088 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/BidiFlowApply.scala.template @@ -7,13 +7,13 @@ import akka.stream.{ Shape, Inlet, Outlet, Graph, BidiShape } trait BidiFlowApply { - def apply[I1, O1, I2, O2]()(block: FlowGraph.Builder ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = { + def apply[I1, O1, I2, O2]()(block: FlowGraph.Builder[Unit] ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = { val builder = new FlowGraph.Builder val shape = block(builder) builder.buildBidiFlow(shape) } - def apply[I1, O1, I2, O2, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { + def apply[I1, O1, I2, O2, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { val builder = new FlowGraph.Builder val p = builder.add(g1, Keep.right) val shape = buildBlock(builder)(p) @@ -21,7 +21,7 @@ trait BidiFlowApply { } [2..#def apply[I##1, O##1, I##2, O##2, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)( - buildBlock: FlowGraph.Builder => ([#g1.Shape#]) ⇒ BidiShape[I##1, O##1, I##2, O##2]): BidiFlow[I##1, O##1, I##2, O##2, Mat] = { + buildBlock: FlowGraph.Builder[Mat] => ([#g1.Shape#]) ⇒ BidiShape[I##1, O##1, I##2, O##2]): BidiFlow[I##1, O##1, I##2, O##2, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/FlowApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/FlowApply.scala.template index f91b0f1566..29713362ed 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/FlowApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/FlowApply.scala.template @@ -7,13 +7,13 @@ import akka.stream.{ Shape, Inlet, Outlet, Graph } trait FlowApply { - def apply[I, O]()(block: FlowGraph.Builder ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit] = { + def apply[I, O]()(block: FlowGraph.Builder[Unit] ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit] = { val builder = new FlowGraph.Builder val (inlet, outlet) = block(builder) builder.buildFlow(inlet, outlet) } - def apply[I, O, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = { + def apply[I, O, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = { val builder = new FlowGraph.Builder val p = builder.add(g1, Keep.right) val (inlet, outlet) = buildBlock(builder)(p) @@ -21,7 +21,7 @@ trait FlowApply { } [2..#def apply[I, O, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)( - buildBlock: FlowGraph.Builder => ([#g1.Shape#]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = { + buildBlock: FlowGraph.Builder[Mat] => ([#g1.Shape#]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index a7cc5b801e..a265dea46e 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -8,20 +8,20 @@ import akka.stream.{ Graph, Shape } trait GraphApply { - def closed()(buildBlock: (FlowGraph.Builder) ⇒ Unit): RunnableFlow[Unit] = { + def closed()(buildBlock: (FlowGraph.Builder[Unit]) ⇒ Unit): RunnableFlow[Unit] = { val builder = new FlowGraph.Builder buildBlock(builder) builder.buildRunnable() } - def closed[Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder ⇒ (g1.Shape) ⇒ Unit): RunnableFlow[Mat] = { + def closed[Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape) ⇒ Unit): RunnableFlow[Mat] = { val builder = new FlowGraph.Builder val p1 = builder.add(g1) buildBlock(builder)(p1) builder.buildRunnable() } - def partial[S <: Shape]()(buildBlock: FlowGraph.Builder ⇒ S): Graph[S, Unit] = { + def partial[S <: Shape]()(buildBlock: FlowGraph.Builder[Unit] ⇒ S): Graph[S, Unit] = { val builder = new FlowGraph.Builder val s = buildBlock(builder) val mod = builder.module.wrap().replaceShape(s) @@ -32,7 +32,7 @@ trait GraphApply { } } - def partial[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder ⇒ (g1.Shape) ⇒ S): Graph[S, Mat] = { + def partial[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape) ⇒ S): Graph[S, Mat] = { val builder = new FlowGraph.Builder val s1 = builder.add(g1) val s = buildBlock(builder)(s1) @@ -46,7 +46,7 @@ trait GraphApply { - [2..#def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableFlow[Mat] = { + [2..#def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableFlow[Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) @@ -58,7 +58,7 @@ trait GraphApply { ] - [2..#def partial[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder ⇒ ([#g1.Shape#]) ⇒ S): Graph[S, Mat] = { + [2..#def partial[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ S): Graph[S, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SinkApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SinkApply.scala.template index e371f573c8..8e7eb0f48e 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SinkApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SinkApply.scala.template @@ -7,13 +7,13 @@ import akka.stream.{ Inlet, Graph, Shape } trait SinkApply { - def apply[In]()(buildBlock: FlowGraph.Builder => Inlet[In]): Sink[In, Unit] = { + def apply[In]()(buildBlock: FlowGraph.Builder[Unit] => Inlet[In]): Sink[In, Unit] = { val builder = new FlowGraph.Builder val inlet = buildBlock(builder) builder.buildSink(inlet) } - def apply[In, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) => Inlet[In]): Sink[In, Mat] = { + def apply[In, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) => Inlet[In]): Sink[In, Mat] = { val builder = new FlowGraph.Builder val s = builder.add(g1, Keep.right) val inlet = buildBlock(builder)(s) @@ -21,7 +21,7 @@ trait SinkApply { } [2..#def apply[In, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)( - buildBlock: FlowGraph.Builder ⇒ ([#g1.Shape#]) ⇒ Inlet[In]): Sink[In, Mat] = { + buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Inlet[In]): Sink[In, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SourceApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SourceApply.scala.template index 085c7ff7ff..b2c3c1c7f1 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SourceApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/SourceApply.scala.template @@ -7,13 +7,13 @@ import akka.stream.{ Outlet, Shape, Graph } trait SourceApply { - def apply[Out]()(buildBlock: FlowGraph.Builder => Outlet[Out]): Source[Out, Unit] = { + def apply[Out]()(buildBlock: FlowGraph.Builder[Unit] => Outlet[Out]): Source[Out, Unit] = { val builder = new FlowGraph.Builder val port = buildBlock(builder) builder.buildSource(port) } - def apply[Out, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) => Outlet[Out]): Source[Out, Mat] = { + def apply[Out, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) => Outlet[Out]): Source[Out, Mat] = { val builder = new FlowGraph.Builder val p = builder.add(g1, Keep.right) val port = buildBlock(builder)(p) @@ -21,7 +21,7 @@ trait SourceApply { } [2..#def apply[Out, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)( - buildBlock: FlowGraph.Builder ⇒ ([#g1.Shape#]) ⇒ Outlet[Out]): Source[Out, Mat] = { + buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Outlet[Out]): Source[Out, Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index d6c24aa87a..5e5bb6640a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ package akka.stream.impl import scala.util.control.NonFatal diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 53d568d779..410c27b845 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -3,11 +3,15 @@ */ package akka.stream.impl +import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference } + +import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.{ Keep, OperationAttributes } import akka.stream._ import org.reactivestreams.{ Subscription, Publisher, Subscriber } import akka.event.Logging.simpleName import scala.collection.mutable +import scala.util.control.NonFatal /** * INTERNAL API @@ -310,6 +314,148 @@ private[stream] class VirtualPublisher[T]() extends Publisher[T] { override def subscribe(s: Subscriber[_ >: T]): Unit = realPublisher.subscribe(s) } +/** + * INTERNAL API + */ +private[stream] case class MaterializedValueSource[M]( + shape: SourceShape[M] = SourceShape[M](new Outlet[M]("Materialized.out")), + attributes: OperationAttributes = OperationAttributes.name("Materialized")) extends StreamLayout.Module { + + override def subModules: Set[Module] = Set.empty + override def withAttributes(attr: OperationAttributes): Module = this.copy(shape = amendShape(attr), attributes = attr) + override def carbonCopy: Module = this.copy(shape = SourceShape(new Outlet[M]("Materialized.out"))) + + override def replaceShape(s: Shape): Module = + if (s == shape) this + else throw new UnsupportedOperationException("cannot replace the shape of MaterializedValueSource") + + def amendShape(attr: OperationAttributes): SourceShape[M] = { + attr.nameOption match { + case None ⇒ shape + case s: Some[String] if s == attributes.nameOption ⇒ shape + case Some(name) ⇒ shape.copy(outlet = new Outlet(name + ".out")) + } + } + +} + +/** + * INTERNAL API + */ +private[stream] object MaterializedValuePublisher { + final val NotRequested = 0 + final val Requested = 1 + final val Completed = 2 + + final val NoValue = new AnyRef +} + +/** + * INTERNAL API + */ +private[stream] class MaterializedValuePublisher extends Publisher[Any] { + import MaterializedValuePublisher._ + + private val value = new AtomicReference[AnyRef](NoValue) + private val registeredSubscriber = new AtomicReference[Subscriber[_ >: Any]](null) + private val requestState = new AtomicInteger(NotRequested) + + private def close(): Unit = { + requestState.set(Completed) + value.set(NoValue) + registeredSubscriber.set(null) + } + + private def tryOrClose(block: ⇒ Unit): Unit = { + try block catch { + case v: ReactiveStreamsCompliance.SpecViolation ⇒ + close() + // What else can we do here? + case NonFatal(e) ⇒ + val sub = registeredSubscriber.get() + if ((sub ne null) && + requestState.compareAndSet(NotRequested, Completed) || requestState.compareAndSet(Requested, Completed)) { + sub.onError(e) + } + close() + throw e + } + } + + def setValue(m: Any): Unit = + tryOrClose { + if (value.compareAndSet(NoValue, m.asInstanceOf[AnyRef]) && requestState.get() == Requested) + pushAndClose(m) + } + + /* + * Both call-sites do a CAS on their "own" side and a GET on the other side. The possible overlaps + * are (removing symmetric cases where you can relabel A->B, B->A): + * + * A-CAS + * A-GET + * B-CAS + * B-GET - pushAndClose fires here + * + * A-CAS + * B-CAS + * A-GET - pushAndClose fires here + * B-GET - pushAndClose fires here + * + * A-CAS + * B-CAS + * B-GET - pushAndClose fires here + * A-GET - pushAndClose fires here + * + * The proof that there are no cases: + * + * - all permutations of 4 operations are 4! = 24 + * - the operations of A and B are cannot be reordered, so there are 24 / (2 * 2) = 6 actual orderings + * - if we don't count cases which are a simple relabeling A->B, B->A, we get 6 / 2 = 3 reorderings + * which are all enumerated above. + * + * pushAndClose protects against double onNext by doing a CAS itself. + */ + private def pushAndClose(m: Any): Unit = { + if (requestState.compareAndSet(Requested, Completed)) { + val sub = registeredSubscriber.get() + ReactiveStreamsCompliance.tryOnNext(sub, m) + ReactiveStreamsCompliance.tryOnComplete(sub) + close() + } + } + + override def subscribe(subscriber: Subscriber[_ >: Any]): Unit = { + tryOrClose { + ReactiveStreamsCompliance.requireNonNullSubscriber(subscriber) + if (registeredSubscriber.compareAndSet(null, subscriber)) { + ReactiveStreamsCompliance.tryOnSubscribe(subscriber, new Subscription { + override def cancel(): Unit = close() + + override def request(n: Long): Unit = { + if (n <= 0) { + ReactiveStreamsCompliance.tryOnError( + subscriber, + ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException) + } else { + if (requestState.compareAndSet(NotRequested, Requested)) { + val m = value.get() + if (m ne NoValue) pushAndClose(m) + } + } + } + }) + } else { + if (subscriber == registeredSubscriber.get()) + ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber) + else + ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "MaterializedValuePublisher") + } + } + } + +} + /** * INTERNAL API */ @@ -332,12 +478,25 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo protected def materializeModule(module: Module, effectiveAttributes: OperationAttributes): Any = { val materializedValues = collection.mutable.HashMap.empty[Module, Any] + var materializedValuePublishers: List[MaterializedValuePublisher] = Nil + for (submodule ← module.subModules) { val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes) - if (submodule.isAtomic) materializedValues.put(submodule, materializeAtomic(submodule, subEffectiveAttributes)) - else materializedValues.put(submodule, materializeComposite(submodule, subEffectiveAttributes)) + submodule match { + case mv: MaterializedValueSource[_] ⇒ + val pub = new MaterializedValuePublisher + materializedValuePublishers ::= pub + assignPort(mv.shape.outlet, pub) + case atomic if atomic.isAtomic ⇒ + materializedValues.put(atomic, materializeAtomic(atomic, subEffectiveAttributes)) + case composite ⇒ + materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes)) + } } - resolveMaterialized(module.materializedValueComputation, materializedValues) + + val mat = resolveMaterialized(module.materializedValueComputation, materializedValues) + materializedValuePublishers foreach { pub ⇒ pub.setValue(mat) } + mat } protected def materializeComposite(composite: Module, effectiveAttributes: OperationAttributes): Any = { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 8c59b73d86..85e3e5eb73 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -4,7 +4,6 @@ package akka.stream.javadsl import akka.stream._ -import akka.stream.scaladsl import akka.japi.Pair /** @@ -278,10 +277,10 @@ object FlowGraph { * The [[Builder]] is mutable and not thread-safe, * thus you should construct your Graph and then share the constructed immutable [[FlowGraph]]. */ - def builder(): Builder = new Builder()(new scaladsl.FlowGraph.Builder) + def builder[M](): Builder[M] = new Builder()(new scaladsl.FlowGraph.Builder[M]) - class Builder()(private implicit val delegate: scaladsl.FlowGraph.Builder) { self ⇒ - import scaladsl.FlowGraph.Implicits._ + final class Builder[Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self ⇒ + import akka.stream.scaladsl.FlowGraph.Implicits._ def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to) @@ -298,6 +297,22 @@ object FlowGraph { def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala) + /** + * Returns an [[Outlet]] that gives access to the materialized value of this graph. Once the graph is materialized + * this outlet will emit exactly one element which is the materialized value. It is possible to expose this + * outlet as an externally accessible outlet of a [[Source]], [[Sink]], [[Flow]] or [[BidiFlow]]. + * + * It is possible to call this method multiple times to get multiple [[Outlet]] instances if necessary. All of + * the outlets will emit the materialized value. + * + * Be careful to not to feed the result of this outlet to a stage that produces the materialized value itself (for + * example to a [[Sink#fold]] that contributes to the materialized value) since that might lead to an unresolvable + * dependency cycle. + * + * @return The outlet that will emit the materialized value. + */ + def matValue: Outlet[Mat] = delegate.matValue + def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat) def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out) @@ -314,26 +329,27 @@ object FlowGraph { def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0)) def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in) - class ForwardOps[T](out: Outlet[T]) { - def to(in: Inlet[T]): Builder = { out ~> in; self } - def to[M](dst: Sink[T, M]): Builder = { out ~> dst.asScala; self } - def to(dst: SinkShape[T]): Builder = { out ~> dst; self } - def to[U](f: FlowShape[T, U]): Builder = { out ~> f; self } - def to[U](j: UniformFanInShape[T, U]): Builder = { out ~> j; self } - def to[U](j: UniformFanOutShape[T, U]): Builder = { out ~> j; self } + final class ForwardOps[T](out: Outlet[T]) { + def to(in: Inlet[T]): Builder[Mat] = { out ~> in; self } + def to[M](dst: Sink[T, M]): Builder[Mat] = { out ~> dst.asScala; self } + def to(dst: SinkShape[T]): Builder[Mat] = { out ~> dst; self } + def to[U](f: FlowShape[T, U]): Builder[Mat] = { out ~> f; self } + def to[U](j: UniformFanInShape[T, U]): Builder[Mat] = { out ~> j; self } + def to[U](j: UniformFanOutShape[T, U]): Builder[Mat] = { out ~> j; self } def via[U, M](f: Flow[T, U, M]): ForwardOps[U] = from((out ~> f.asScala).outlet) def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet) def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) + def out(): Outlet[T] = out } - class ReverseOps[T](out: Inlet[T]) { - def from(dst: Outlet[T]): Builder = { out <~ dst; self } - def from[M](dst: Source[T, M]): Builder = { out <~ dst.asScala; self } - def from(dst: SourceShape[T]): Builder = { out <~ dst; self } - def from[U](f: FlowShape[U, T]): Builder = { out <~ f; self } - def from[U](j: UniformFanInShape[U, T]): Builder = { out <~ j; self } - def from[U](j: UniformFanOutShape[U, T]): Builder = { out <~ j; self } + final class ReverseOps[T](out: Inlet[T]) { + def from(dst: Outlet[T]): Builder[Mat] = { out <~ dst; self } + def from[M](dst: Source[T, M]): Builder[Mat] = { out <~ dst.asScala; self } + def from(dst: SourceShape[T]): Builder[Mat] = { out <~ dst; self } + def from[U](f: FlowShape[U, T]): Builder[Mat] = { out <~ f; self } + def from[U](j: UniformFanInShape[U, T]): Builder[Mat] = { out <~ j; self } + def from[U](j: UniformFanOutShape[U, T]): Builder[Mat] = { out <~ j; self } def via[U, M](f: Flow[U, T, M]): ReverseOps[U] = to((out <~ f.asScala).inlet) def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet) def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 128a9acf66..d17c3f572c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -171,10 +171,10 @@ object Concat { object FlowGraph extends GraphApply { - class Builder private[stream] () { + class Builder[+M] private[stream] () { private var moduleInProgress: Module = EmptyModule - def addEdge[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = { + def addEdge[A, B, M2](from: Outlet[A], via: Flow[A, B, M2], to: Inlet[B]): Unit = { val flowCopy = via.module.carbonCopy moduleInProgress = moduleInProgress @@ -215,6 +215,26 @@ object FlowGraph extends GraphApply { def add[T](s: Source[T, _]): Outlet[T] = add(s: Graph[SourceShape[T], _]).outlet def add[T](s: Sink[T, _]): Inlet[T] = add(s: Graph[SinkShape[T], _]).inlet + /** + * Returns an [[Outlet]] that gives access to the materialized value of this graph. Once the graph is materialized + * this outlet will emit exactly one element which is the materialized value. It is possible to expose this + * outlet as an externally accessible outlet of a [[Source]], [[Sink]], [[Flow]] or [[BidiFlow]]. + * + * It is possible to call this method multiple times to get multiple [[Outlet]] instances if necessary. All of + * the outlets will emit the materialized value. + * + * Be careful to not to feed the result of this outlet to a stage that produces the materialized value itself (for + * example to a [[Sink#fold]] that contributes to the materialized value) since that might lead to an unresolvable + * dependency cycle. + * + * @return The outlet that will emit the materialized value. + */ + def matValue: Outlet[M] = { + val module = new MaterializedValueSource[Any] + moduleInProgress = moduleInProgress.grow(module) + module.shape.outlet.asInstanceOf[Outlet[M]] + } + private[stream] def andThen(port: OutPort, op: StageModule): Unit = { moduleInProgress = moduleInProgress @@ -228,7 +248,7 @@ object FlowGraph extends GraphApply { "Cannot build the RunnableFlow because there are unconnected ports: " + (moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", ")) } - new RunnableFlow(moduleInProgress) + new RunnableFlow(moduleInProgress.wrap()) } private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = { @@ -239,7 +259,7 @@ object FlowGraph extends GraphApply { s"Cannot build Source with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})") if (moduleInProgress.outPorts.head != outlet) throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the module’s open Outlet ${moduleInProgress.outPorts.head}") - new Source(moduleInProgress.replaceShape(SourceShape(outlet))) + new Source(moduleInProgress.replaceShape(SourceShape(outlet)).wrap()) } private[stream] def buildFlow[In, Out, Mat](inlet: Inlet[In], outlet: Outlet[Out]): Flow[In, Out, Mat] = { @@ -250,7 +270,7 @@ object FlowGraph extends GraphApply { throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the module’s open Outlet ${moduleInProgress.outPorts.head}") if (moduleInProgress.inPorts.head != inlet) throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the module’s open Inlet ${moduleInProgress.inPorts.head}") - new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet))) + new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet)).wrap()) } private[stream] def buildBidiFlow[I1, O1, I2, O2, Mat](shape: BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { @@ -261,7 +281,7 @@ object FlowGraph extends GraphApply { throw new IllegalArgumentException(s"provided Outlets [${shape.outlets.mkString(",")}] does not equal the module’s open Outlets [${moduleInProgress.outPorts.mkString(",")}]") if (moduleInProgress.inPorts.toSet != shape.inlets.toSet) throw new IllegalArgumentException(s"provided Inlets [${shape.inlets.mkString(",")}] does not equal the module’s open Inlets [${moduleInProgress.inPorts.mkString(",")}]") - new BidiFlow(moduleInProgress.replaceShape(shape)) + new BidiFlow(moduleInProgress.replaceShape(shape).wrap()) } private[stream] def buildSink[T, Mat](inlet: Inlet[T]): Sink[T, Mat] = { @@ -272,7 +292,7 @@ object FlowGraph extends GraphApply { s"Cannot build Sink with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})") if (moduleInProgress.inPorts.head != inlet) throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the module’s open Inlet ${moduleInProgress.inPorts.head}") - new Sink(moduleInProgress.replaceShape(SinkShape(inlet))) + new Sink(moduleInProgress.replaceShape(SinkShape(inlet)).wrap()) } private[stream] def module: Module = moduleInProgress @@ -282,7 +302,7 @@ object FlowGraph extends GraphApply { object Implicits { @tailrec - private[stream] def findOut[I, O](b: Builder, junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = { + private[stream] def findOut[I, O](b: Builder[_], junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = { if (n == junction.outArray.length) throw new IllegalArgumentException(s"no more outlets free on $junction") else if (b.module.downstreams.contains(junction.out(n))) findOut(b, junction, n + 1) @@ -290,7 +310,7 @@ object FlowGraph extends GraphApply { } @tailrec - private[stream] def findIn[I, O](b: Builder, junction: UniformFanInShape[I, O], n: Int): Inlet[I] = { + private[stream] def findIn[I, O](b: Builder[_], junction: UniformFanInShape[I, O], n: Int): Inlet[I] = { if (n == junction.inArray.length) throw new IllegalArgumentException(s"no more inlets free on $junction") else if (b.module.upstreams.contains(junction.in(n))) findIn(b, junction, n + 1) @@ -298,19 +318,19 @@ object FlowGraph extends GraphApply { } trait CombinerBase[T] extends Any { - def importAndGetPort(b: Builder): Outlet[T] + def importAndGetPort(b: Builder[_]): Outlet[T] - def ~>(to: Inlet[T])(implicit b: Builder): Unit = { + def ~>(to: Inlet[T])(implicit b: Builder[_]): Unit = { b.addEdge(importAndGetPort(b), to) } - def ~>[Out](via: Flow[T, Out, _])(implicit b: Builder): PortOps[Out, Unit] = { + def ~>[Out](via: Flow[T, Out, _])(implicit b: Builder[_]): PortOps[Out, Unit] = { val s = b.add(via) b.addEdge(importAndGetPort(b), s.inlet) s.outlet } - def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder): PortOps[Out, Unit] = { + def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out, Unit] = { def bind(n: Int): Unit = { if (n == junction.inArray.length) throw new IllegalArgumentException(s"no more inlets free on $junction") @@ -321,7 +341,7 @@ object FlowGraph extends GraphApply { junction.out } - def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder): PortOps[Out, Unit] = { + def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out, Unit] = { b.addEdge(importAndGetPort(b), junction.in) try findOut(b, junction, 0) catch { @@ -329,34 +349,34 @@ object FlowGraph extends GraphApply { } } - def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder): PortOps[Out, Unit] = { + def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out, Unit] = { b.addEdge(importAndGetPort(b), flow.inlet) flow.outlet } - def ~>(to: Sink[T, _])(implicit b: Builder): Unit = { + def ~>(to: Sink[T, _])(implicit b: Builder[_]): Unit = { b.addEdge(importAndGetPort(b), b.add(to)) } - def ~>(to: SinkShape[T])(implicit b: Builder): Unit = { + def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit = { b.addEdge(importAndGetPort(b), to.inlet) } } trait ReverseCombinerBase[T] extends Any { - def importAndGetPortReverse(b: Builder): Inlet[T] + def importAndGetPortReverse(b: Builder[_]): Inlet[T] - def <~(from: Outlet[T])(implicit b: Builder): Unit = { + def <~(from: Outlet[T])(implicit b: Builder[_]): Unit = { b.addEdge(from, importAndGetPortReverse(b)) } - def <~[In](via: Flow[In, T, _])(implicit b: Builder): ReversePortOps[In] = { + def <~[In](via: Flow[In, T, _])(implicit b: Builder[_]): ReversePortOps[In] = { val s = b.add(via) b.addEdge(s.outlet, importAndGetPortReverse(b)) s.inlet } - def <~[In](junction: UniformFanOutShape[In, T])(implicit b: Builder): ReversePortOps[In] = { + def <~[In](junction: UniformFanOutShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = { def bind(n: Int): Unit = { if (n == junction.outArray.length) throw new IllegalArgumentException(s"no more outlets free on $junction") @@ -367,7 +387,7 @@ object FlowGraph extends GraphApply { junction.in } - def <~[In](junction: UniformFanInShape[In, T])(implicit b: Builder): ReversePortOps[In] = { + def <~[In](junction: UniformFanInShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = { b.addEdge(junction.out, importAndGetPortReverse(b)) try findIn(b, junction, 0) catch { @@ -375,21 +395,21 @@ object FlowGraph extends GraphApply { } } - def <~[In](flow: FlowShape[In, T])(implicit b: Builder): ReversePortOps[In] = { + def <~[In](flow: FlowShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = { b.addEdge(flow.outlet, importAndGetPortReverse(b)) flow.inlet } - def <~(from: Source[T, _])(implicit b: Builder): Unit = { + def <~(from: Source[T, _])(implicit b: Builder[_]): Unit = { b.addEdge(b.add(from), importAndGetPortReverse(b)) } - def <~(from: SourceShape[T])(implicit b: Builder): Unit = { + def <~(from: SourceShape[T])(implicit b: Builder[_]): Unit = { b.addEdge(from.outlet, importAndGetPortReverse(b)) } } - class PortOps[Out, Mat](val outlet: Outlet[Out], b: Builder) extends FlowOps[Out, Mat] with CombinerBase[Out] { + class PortOps[Out, Mat](val outlet: Outlet[Out], b: Builder[_]) extends FlowOps[Out, Mat] with CombinerBase[Out] { override type Repr[+O, +M] = PortOps[O, M] @uncheckedVariance override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = @@ -406,55 +426,55 @@ object FlowGraph extends GraphApply { new PortOps(op.shape.outlet.asInstanceOf[Outlet[U]], b) } - override def importAndGetPort(b: Builder): Outlet[Out] = outlet + override def importAndGetPort(b: Builder[_]): Outlet[Out] = outlet } class DisabledPortOps[Out, Mat](msg: String) extends PortOps[Out, Mat](null, null) { - override def importAndGetPort(b: Builder): Outlet[Out] = throw new IllegalArgumentException(msg) + override def importAndGetPort(b: Builder[_]): Outlet[Out] = throw new IllegalArgumentException(msg) } implicit class ReversePortOps[In](val inlet: Inlet[In]) extends ReverseCombinerBase[In] { - override def importAndGetPortReverse(b: Builder): Inlet[In] = inlet + override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = inlet } class DisabledReversePortOps[In](msg: String) extends ReversePortOps[In](null) { - override def importAndGetPortReverse(b: Builder): Inlet[In] = throw new IllegalArgumentException(msg) + override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = throw new IllegalArgumentException(msg) } implicit class FanInOps[In, Out](val j: UniformFanInShape[In, Out]) extends AnyVal with CombinerBase[Out] with ReverseCombinerBase[In] { - override def importAndGetPort(b: Builder): Outlet[Out] = j.out - override def importAndGetPortReverse(b: Builder): Inlet[In] = findIn(b, j, 0) + override def importAndGetPort(b: Builder[_]): Outlet[Out] = j.out + override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = findIn(b, j, 0) } implicit class FanOutOps[In, Out](val j: UniformFanOutShape[In, Out]) extends AnyVal with ReverseCombinerBase[In] { - override def importAndGetPortReverse(b: Builder): Inlet[In] = j.in + override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = j.in } implicit class SinkArrow[T](val s: Sink[T, _]) extends AnyVal with ReverseCombinerBase[T] { - override def importAndGetPortReverse(b: Builder): Inlet[T] = b.add(s) + override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s) } implicit class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] { - override def importAndGetPortReverse(b: Builder): Inlet[T] = s.inlet + override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = s.inlet } implicit class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] { - override def importAndGetPortReverse(b: Builder): Inlet[I] = f.inlet + override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.inlet - def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder): BidiShape[O, O2, I2, I] = { + def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { val shape = b.add(bidi) b.addEdge(f.outlet, shape.in1) b.addEdge(shape.out2, f.inlet) shape } - def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder): BidiShape[O, O2, I2, I] = { + def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { b.addEdge(f.outlet, bidi.in1) b.addEdge(bidi.out2, f.inlet) bidi } - def <~>[M](flow: Flow[O, I, M])(implicit b: Builder): Unit = { + def <~>[M](flow: Flow[O, I, M])(implicit b: Builder[_]): Unit = { val shape = b.add(flow) b.addEdge(shape.outlet, f.inlet) b.addEdge(f.outlet, shape.inlet) @@ -462,7 +482,7 @@ object FlowGraph extends GraphApply { } implicit class FlowArrow[I, O, M](val f: Flow[I, O, M]) extends AnyVal { - def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder): BidiShape[O, O2, I2, I] = { + def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { val shape = b.add(bidi) val flow = b.add(f) b.addEdge(flow.outlet, shape.in1) @@ -470,14 +490,14 @@ object FlowGraph extends GraphApply { shape } - def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder): BidiShape[O, O2, I2, I] = { + def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = { val flow = b.add(f) b.addEdge(flow.outlet, bidi.in1) b.addEdge(bidi.out2, flow.inlet) bidi } - def <~>[M2](flow: Flow[O, I, M2])(implicit b: Builder): Unit = { + def <~>[M2](flow: Flow[O, I, M2])(implicit b: Builder[_]): Unit = { val shape = b.add(flow) val ff = b.add(f) b.addEdge(shape.outlet, ff.inlet) @@ -486,25 +506,25 @@ object FlowGraph extends GraphApply { } implicit class BidiFlowShapeArrow[I1, O1, I2, O2](val bidi: BidiShape[I1, O1, I2, O2]) extends AnyVal { - def <~>[I3, O3](other: BidiShape[O1, O3, I3, I2])(implicit b: Builder): BidiShape[O1, O3, I3, I2] = { + def <~>[I3, O3](other: BidiShape[O1, O3, I3, I2])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = { b.addEdge(bidi.out1, other.in1) b.addEdge(other.out2, bidi.in2) other } - def <~>[I3, O3, M](otherFlow: BidiFlow[O1, O3, I3, I2, M])(implicit b: Builder): BidiShape[O1, O3, I3, I2] = { + def <~>[I3, O3, M](otherFlow: BidiFlow[O1, O3, I3, I2, M])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = { val other = b.add(otherFlow) b.addEdge(bidi.out1, other.in1) b.addEdge(other.out2, bidi.in2) other } - def <~>(flow: FlowShape[O1, I2])(implicit b: Builder): Unit = { + def <~>(flow: FlowShape[O1, I2])(implicit b: Builder[_]): Unit = { b.addEdge(bidi.out1, flow.inlet) b.addEdge(flow.outlet, bidi.in2) } - def <~>[M](f: Flow[O1, I2, M])(implicit b: Builder): Unit = { + def <~>[M](f: Flow[O1, I2, M])(implicit b: Builder[_]): Unit = { val flow = b.add(f) b.addEdge(bidi.out1, flow.inlet) b.addEdge(flow.outlet, bidi.in2) @@ -513,21 +533,21 @@ object FlowGraph extends GraphApply { import scala.language.implicitConversions - implicit def port2flow[T](from: Outlet[T])(implicit b: Builder): PortOps[T, Unit] = + implicit def port2flow[T](from: Outlet[T])(implicit b: Builder[_]): PortOps[T, Unit] = new PortOps(from, b) - implicit def fanOut2flow[I, O](j: UniformFanOutShape[I, O])(implicit b: Builder): PortOps[O, Unit] = + implicit def fanOut2flow[I, O](j: UniformFanOutShape[I, O])(implicit b: Builder[_]): PortOps[O, Unit] = new PortOps(findOut(b, j, 0), b) - implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder): PortOps[O, Unit] = + implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder[_]): PortOps[O, Unit] = new PortOps(f.outlet, b) implicit class SourceArrow[T](val s: Source[T, _]) extends AnyVal with CombinerBase[T] { - override def importAndGetPort(b: Builder): Outlet[T] = b.add(s) + override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s) } implicit class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] { - override def importAndGetPort(b: Builder): Outlet[T] = s.outlet + override def importAndGetPort(b: Builder[_]): Outlet[T] = s.outlet } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala index 44ce29cba8..093aba8d5e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JavaConverters.scala @@ -23,8 +23,8 @@ private[akka] object JavaConverters { implicit final class AddAsJavaSink[In, Mat](val sink: scaladsl.Sink[In, Mat]) extends AnyVal { def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink) } - implicit final class AsAsJavaFlowGraphBuilder[Out](val builder: scaladsl.FlowGraph.Builder) extends AnyVal { - def asJava: javadsl.FlowGraph.Builder = new javadsl.FlowGraph.Builder()(builder) + implicit final class AsAsJavaFlowGraphBuilder[Out, Mat](val builder: scaladsl.FlowGraph.Builder[Mat]) extends AnyVal { + def asJava: javadsl.FlowGraph.Builder[Mat] = new javadsl.FlowGraph.Builder()(builder) } implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal { @@ -39,7 +39,7 @@ private[akka] object JavaConverters { implicit final class AddAsScalaSink[In, Mat](val sink: javadsl.Sink[In, Mat]) extends AnyVal { def asScala: scaladsl.Sink[In, Mat] = sink.asScala } - implicit final class AsAsScalaFlowGraphBuilder[Out](val builder: javadsl.FlowGraph.Builder) extends AnyVal { - def asScala: FlowGraph.Builder = builder.asScala + implicit final class AsAsScalaFlowGraphBuilder[Out, Mat](val builder: javadsl.FlowGraph.Builder[Mat]) extends AnyVal { + def asScala: FlowGraph.Builder[Mat] = builder.asScala } }