Merge pull request #17097 from drewhk/wip-16168-mat-value-source-drewhk

!str #16168: Expose materialized value in the graph as a "source"
This commit is contained in:
drewhk 2015-04-07 13:28:32 +02:00
commit a65adc5f16
33 changed files with 601 additions and 173 deletions

View file

@ -206,6 +206,9 @@ resulting values. Some examples of using these combiners are illustrated in the
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-mat-combine .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-mat-combine
.. note::
In Graphs it is possible to access the materialized value from inside the stream processing graph. For details see
:ref:`graph-matvalue-java`
Stream ordering Stream ordering
=============== ===============

View file

@ -194,6 +194,24 @@ together and also turned around with the ``.reversed()`` method. The test
simulates both parties of a network communication protocol without actually simulates both parties of a network communication protocol without actually
having to open a network connection—the flows can just be connected directly. having to open a network connection—the flows can just be connected directly.
.. _graph-matvalue-java:
Accessing the materialized value inside the Graph
-------------------------------------------------
In certain cases it might be necessary to feed back the materialized value of a Graph (partial, closed or backing a
Source, Sink, Flow or BidiFlow). This is possible by using ``builder.matValue`` which gives an ``Outlet`` that
can be used in the graph as an ordinary source or outlet, and which will eventually emit the materialized value.
If the materialized value is needed at more than one place, it is possible to call ``matValue`` any number of times
to acquire the necessary number of outlets.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowGraphDocTest.java#flow-graph-matvalue
Be careful not to introduce a cycle where the materialized value actually contributes to the materialized value.
The following example demonstrates a case where the materialized ``Future`` of a fold is fed back to the fold itself.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowGraphDocTest.java#flow-graph-matvalue-cycle
.. _graph-cycles-java: .. _graph-cycles-java:
Graph cycles, liveness and deadlocks Graph cycles, liveness and deadlocks

View file

@ -8,7 +8,7 @@ import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._ import scala.concurrent.duration._
class FlowGraphDocSpec extends AkkaSpec { class FlowGraphDocSpec extends AkkaSpec {
@ -20,7 +20,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph" in { "build simple graph" in {
//format: OFF //format: OFF
//#simple-flow-graph //#simple-flow-graph
val g = FlowGraph.closed() { implicit builder: FlowGraph.Builder => val g = FlowGraph.closed() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._ import FlowGraph.Implicits._
val in = Source(1 to 10) val in = Source(1 to 10)
val out = Sink.ignore val out = Sink.ignore
@ -43,7 +43,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph without implicits" in { "build simple graph without implicits" in {
//#simple-flow-graph-no-implicits //#simple-flow-graph-no-implicits
val g = FlowGraph.closed() { builder: FlowGraph.Builder => val g = FlowGraph.closed() { builder: FlowGraph.Builder[Unit] =>
val in = Source(1 to 10) val in = Source(1 to 10)
val out = Sink.ignore val out = Sink.ignore
@ -219,4 +219,33 @@ class FlowGraphDocSpec extends AkkaSpec {
} }
"access to materialized value" in {
//#flow-graph-matvalue
import FlowGraph.Implicits._
val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder
fold
(fold.inlet, builder.matValue.mapAsync(identity).outlet)
}
//#flow-graph-matvalue
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55)
//#flow-graph-matvalue-cycle
import FlowGraph.Implicits._
// This cannot produce any value:
val cyclicFold: Source[Int, Future[Int]] = Source(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder =>
fold =>
// - Fold cannot complete until its upstream mapAsync completes
// - mapAsync cannot complete until the materialized Future produced by
// fold completes
// As a result this Source will never emit anything, and its materialited
// Future will never complete
builder.matValue.mapAsync(identity) ~> fold
builder.matValue.mapAsync(identity).outlet
}
//#flow-graph-matvalue-cycle
}
} }

View file

@ -210,6 +210,10 @@ resulting values. Some examples of using these combiners are illustrated in the
.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-mat-combine .. includecode:: code/docs/stream/FlowDocSpec.scala#flow-mat-combine
.. note::
In Graphs it is possible to access the materialized value from inside the stream processing graph. For details see
:ref:`graph-matvalue-scala`
Stream ordering Stream ordering
=============== ===============
In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs ``{IA1,IA2,...,IAn}`` In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs ``{IA1,IA2,...,IAn}``

View file

@ -248,6 +248,24 @@ together and also turned around with the ``.reversed`` method. The test
simulates both parties of a network communication protocol without actually simulates both parties of a network communication protocol without actually
having to open a network connection—the flows can just be connected directly. having to open a network connection—the flows can just be connected directly.
.. _graph-matvalue-scala:
Accessing the materialized value inside the Graph
-------------------------------------------------
In certain cases it might be necessary to feed back the materialized value of a Graph (partial, closed or backing a
Source, Sink, Flow or BidiFlow). This is possible by using ``builder.matValue`` which gives an ``Outlet`` that
can be used in the graph as an ordinary source or outlet, and which will eventually emit the materialized value.
If the materialized value is needed at more than one place, it is possible to call ``matValue`` any number of times
to acquire the necessary number of outlets.
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-matvalue
Be careful not to introduce a cycle where the materialized value actually contributes to the materialized value.
The following example demonstrates a case where the materialized ``Future`` of a fold is fed back to the fold itself.
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-matvalue-cycle
.. _graph-cycles-scala: .. _graph-cycles-scala:
Graph cycles, liveness and deadlocks Graph cycles, liveness and deadlocks

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.impl.MaterializedValuePublisher
import org.reactivestreams.Publisher
class MaterializedValuePublisherTest extends AkkaPublisherVerification[Any] {
override def createPublisher(elements: Long): Publisher[Any] = {
val pub = new MaterializedValuePublisher()
// it contains a value already
pub.setValue("Hello")
pub
}
override def maxElementsFromPublisher = 1
}

View file

@ -17,13 +17,13 @@ abstract class TwoStreamsSetup extends AkkaSpec {
type Outputs type Outputs
abstract class Fixture(b: FlowGraph.Builder) { abstract class Fixture(b: FlowGraph.Builder[_]) {
def left: Inlet[Int] def left: Inlet[Int]
def right: Inlet[Int] def right: Inlet[Int]
def out: Outlet[Outputs] def out: Outlet[Outputs]
} }
def fixture(b: FlowGraph.Builder): Fixture def fixture(b: FlowGraph.Builder[_]): Fixture
def setup(p1: Publisher[Int], p2: Publisher[Int]) = { def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val subscriber = StreamTestKit.SubscriberProbe[Outputs]() val subscriber = StreamTestKit.SubscriberProbe[Outputs]()

View file

@ -37,9 +37,9 @@ public class BidiFlowTest extends StreamTest {
private final BidiFlow<Integer, Long, ByteString, String, BoxedUnit> bidi = BidiFlow private final BidiFlow<Integer, Long, ByteString, String, BoxedUnit> bidi = BidiFlow
.factory() .factory()
.create( .create(
new Function<FlowGraph.Builder, BidiShape<Integer, Long, ByteString, String>>() { new Function<FlowGraph.Builder<BoxedUnit>, BidiShape<Integer, Long, ByteString, String>>() {
@Override @Override
public BidiShape<Integer, Long, ByteString, String> apply(Builder b) public BidiShape<Integer, Long, ByteString, String> apply(Builder<BoxedUnit> b)
throws Exception { throws Exception {
final FlowShape<Integer, Long> top = b.graph(Flow final FlowShape<Integer, Long> top = b.graph(Flow
.<Integer> empty().map(new Function<Integer, Long>() { .<Integer> empty().map(new Function<Integer, Long>() {
@ -63,9 +63,9 @@ public class BidiFlowTest extends StreamTest {
private final BidiFlow<Long, Integer, String, ByteString, BoxedUnit> inverse = BidiFlow private final BidiFlow<Long, Integer, String, ByteString, BoxedUnit> inverse = BidiFlow
.factory() .factory()
.create( .create(
new Function<FlowGraph.Builder, BidiShape<Long, Integer, String, ByteString>>() { new Function<FlowGraph.Builder<BoxedUnit>, BidiShape<Long, Integer, String, ByteString>>() {
@Override @Override
public BidiShape<Long, Integer, String, ByteString> apply(Builder b) public BidiShape<Long, Integer, String, ByteString> apply(Builder<BoxedUnit> b)
throws Exception { throws Exception {
final FlowShape<Long, Integer> top = b.graph(Flow.<Long> empty() final FlowShape<Long, Integer> top = b.graph(Flow.<Long> empty()
.map(new Function<Long, Integer>() { .map(new Function<Long, Integer>() {
@ -90,9 +90,9 @@ public class BidiFlowTest extends StreamTest {
.factory() .factory()
.create( .create(
Sink.<Integer> head(), Sink.<Integer> head(),
new Function2<FlowGraph.Builder, SinkShape<Integer>, BidiShape<Integer, Long, ByteString, String>>() { new Function2<FlowGraph.Builder<Future<Integer>>, SinkShape<Integer>, BidiShape<Integer, Long, ByteString, String>>() {
@Override @Override
public BidiShape<Integer, Long, ByteString, String> apply(Builder b, SinkShape<Integer> sink) public BidiShape<Integer, Long, ByteString, String> apply(Builder<Future<Integer>> b, SinkShape<Integer> sink)
throws Exception { throws Exception {
b.from(Source.single(42)).to(sink); b.from(Source.single(42)).to(sink);
final FlowShape<Integer, Long> top = b.graph(Flow final FlowShape<Integer, Long> top = b.graph(Flow
@ -130,9 +130,9 @@ public class BidiFlowTest extends StreamTest {
.factory() .factory()
.closed(Sink.<Long> head(), Sink.<String> head(), .closed(Sink.<Long> head(), Sink.<String> head(),
Keep.<Future<Long>, Future<String>> both(), Keep.<Future<Long>, Future<String>> both(),
new Procedure3<Builder, SinkShape<Long>, SinkShape<String>>() { new Procedure3<Builder<Pair<Future<Long>, Future<String>>>, SinkShape<Long>, SinkShape<String>>() {
@Override @Override
public void apply(Builder b, SinkShape<Long> st, public void apply(Builder<Pair<Future<Long>, Future<String>>> b, SinkShape<Long> st,
SinkShape<String> sb) throws Exception { SinkShape<String> sb) throws Exception {
final BidiShape<Integer, Long, ByteString, String> s = b final BidiShape<Integer, Long, ByteString, String> s = b
.graph(bidi); .graph(bidi);
@ -201,9 +201,9 @@ public class BidiFlowTest extends StreamTest {
@Test @Test
public void mustMaterializeToItsValue() throws Exception { public void mustMaterializeToItsValue() throws Exception {
final Future<Integer> f = FlowGraph.factory().closed(bidiMat, new Procedure2<Builder, BidiShape<Integer, Long, ByteString, String>>() { final Future<Integer> f = FlowGraph.factory().closed(bidiMat, new Procedure2<Builder<Future<Integer> >, BidiShape<Integer, Long, ByteString, String>>() {
@Override @Override
public void apply(Builder b, public void apply(Builder<Future<Integer>> b,
BidiShape<Integer, Long, ByteString, String> shape) throws Exception { BidiShape<Integer, Long, ByteString, String> shape) throws Exception {
final FlowShape<String, Integer> left = b.graph(Flow.<String> empty().map( final FlowShape<String, Integer> left = b.graph(Flow.<String> empty().map(
new Function<String, Integer>() { new Function<String, Integer>() {
@ -227,9 +227,9 @@ public class BidiFlowTest extends StreamTest {
@Test @Test
public void mustCombineMaterializationValues() throws Exception { public void mustCombineMaterializationValues() throws Exception {
final Flow<String, Integer, Future<Integer>> left = Flow.factory().create( final Flow<String, Integer, Future<Integer>> left = Flow.factory().create(
Sink.<Integer> head(), new Function2<Builder, SinkShape<Integer>, Pair<Inlet<String>, Outlet<Integer>>>() { Sink.<Integer> head(), new Function2<Builder<Future<Integer> >, SinkShape<Integer>, Pair<Inlet<String>, Outlet<Integer>>>() {
@Override @Override
public Pair<Inlet<String>, Outlet<Integer>> apply(Builder b, public Pair<Inlet<String>, Outlet<Integer>> apply(Builder<Future<Integer>> b,
SinkShape<Integer> sink) throws Exception { SinkShape<Integer> sink) throws Exception {
final UniformFanOutShape<Integer, Integer> bcast = b.graph(Broadcast.<Integer> create(2)); final UniformFanOutShape<Integer, Integer> bcast = b.graph(Broadcast.<Integer> create(2));
final UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer> create(2)); final UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer> create(2));
@ -247,9 +247,9 @@ public class BidiFlowTest extends StreamTest {
} }
}); });
final Flow<Long, ByteString, Future<List<Long>>> right = Flow.factory().create( final Flow<Long, ByteString, Future<List<Long>>> right = Flow.factory().create(
Sink.<List<Long>> head(), new Function2<Builder, SinkShape<List<Long>>, Pair<Inlet<Long>, Outlet<ByteString>>>() { Sink.<List<Long>> head(), new Function2<Builder<Future<List<Long>>>, SinkShape<List<Long>>, Pair<Inlet<Long>, Outlet<ByteString>>>() {
@Override @Override
public Pair<Inlet<Long>, Outlet<ByteString>> apply(Builder b, public Pair<Inlet<Long>, Outlet<ByteString>> apply(Builder<Future<List<Long>>> b,
SinkShape<List<Long>> sink) throws Exception { SinkShape<List<Long>> sink) throws Exception {
final FlowShape<Long, List<Long>> flow = b.graph(Flow.<Long> empty().grouped(10)); final FlowShape<Long, List<Long>> flow = b.graph(Flow.<Long> empty().grouped(10));
b.from(flow).to(sink); b.from(flow).to(sink);

View file

@ -47,9 +47,9 @@ public class FlexiMergeTest {
final Future<List<String>> all = FlowGraph final Future<List<String>> all = FlowGraph
.factory() .factory()
.closed(Sink.<List<String>> head(), .closed(Sink.<List<String>> head(),
new Procedure2<Builder, SinkShape<List<String>>>() { new Procedure2<Builder<Future<List<String>> >, SinkShape<List<String>>>() {
@Override @Override
public void apply(Builder b, SinkShape<List<String>> sink) public void apply(Builder<Future<List<String>> > b, SinkShape<List<String>> sink)
throws Exception { throws Exception {
final UniformFanInShape<String, String> merge = b.graph(new Fair<String>()); final UniformFanInShape<String, String> merge = b.graph(new Fair<String>());
b.edge(b.source(in1), merge.in(0)); b.edge(b.source(in1), merge.in(0));
@ -69,9 +69,9 @@ public class FlexiMergeTest {
final Future<List<String>> all = FlowGraph final Future<List<String>> all = FlowGraph
.factory() .factory()
.closed(Sink.<List<String>> head(), .closed(Sink.<List<String>> head(),
new Procedure2<Builder, SinkShape<List<String>>>() { new Procedure2<Builder<Future<List<String>>>, SinkShape<List<String>>>() {
@Override @Override
public void apply(Builder b, SinkShape<List<String>> sink) public void apply(Builder<Future<List<String>>> b, SinkShape<List<String>> sink)
throws Exception { throws Exception {
final UniformFanInShape<String, String> merge = b.graph(new StrictRoundRobin<String>()); final UniformFanInShape<String, String> merge = b.graph(new StrictRoundRobin<String>());
b.edge(b.source(in1), merge.in(0)); b.edge(b.source(in1), merge.in(0));
@ -93,9 +93,9 @@ public class FlexiMergeTest {
final Future<List<Pair<Integer, String>>> all = FlowGraph final Future<List<Pair<Integer, String>>> all = FlowGraph
.factory() .factory()
.closed(Sink.<List<Pair<Integer, String>>>head(), .closed(Sink.<List<Pair<Integer, String>>>head(),
new Procedure2<Builder, SinkShape<List<Pair<Integer, String>>>>() { new Procedure2<Builder<Future<List<Pair<Integer, String>>>>, SinkShape<List<Pair<Integer, String>>>>() {
@Override @Override
public void apply(Builder b, SinkShape<List<Pair<Integer, String>>> sink) public void apply(Builder<Future<List<Pair<Integer, String>>>> b, SinkShape<List<Pair<Integer, String>>> sink)
throws Exception { throws Exception {
final FanInShape2<Integer, String, Pair<Integer, String>> zip = b.graph(new Zip<Integer, String>()); final FanInShape2<Integer, String, Pair<Integer, String>> zip = b.graph(new Zip<Integer, String>());
b.edge(b.source(inA), zip.in0()); b.edge(b.source(inA), zip.in0());
@ -120,9 +120,9 @@ public class FlexiMergeTest {
final Future<List<Triple<Long, Integer, String>>> all = FlowGraph final Future<List<Triple<Long, Integer, String>>> all = FlowGraph
.factory() .factory()
.closed(Sink.<List<Triple<Long, Integer, String>>> head(), .closed(Sink.<List<Triple<Long, Integer, String>>> head(),
new Procedure2<Builder, SinkShape<List<Triple<Long, Integer, String>>>>() { new Procedure2<Builder<Future<List<Triple<Long, Integer, String>>>>, SinkShape<List<Triple<Long, Integer, String>>>>() {
@Override @Override
public void apply(Builder b, SinkShape<List<Triple<Long, Integer, String>>> sink) public void apply(Builder<Future<List<Triple<Long, Integer, String>>>> b, SinkShape<List<Triple<Long, Integer, String>>> sink)
throws Exception { throws Exception {
final FanInShape3<Long, Integer, String, Triple<Long, Integer, String>> zip = final FanInShape3<Long, Integer, String, Triple<Long, Integer, String>> zip =
b.graph(new TripleZip<Long, Integer, String>()); b.graph(new TripleZip<Long, Integer, String>());

View file

@ -50,9 +50,9 @@ public class FlexiRouteTest {
out1, out1,
out2, out2,
Keep.<Future<List<String>>, Future<List<String>>> both(), Keep.<Future<List<String>>, Future<List<String>>> both(),
new Procedure3<Builder, SinkShape<List<String>>, SinkShape<List<String>>>() { new Procedure3<Builder<Pair<Future<List<String>>, Future<List<String>>>>, SinkShape<List<String>>, SinkShape<List<String>>>() {
@Override @Override
public void apply(Builder b, SinkShape<List<String>> o1, public void apply(Builder<Pair<Future<List<String>>, Future<List<String>>>> b, SinkShape<List<String>> o1,
SinkShape<List<String>> o2) throws Exception { SinkShape<List<String>> o2) throws Exception {
final UniformFanOutShape<String, String> fair = b.graph(new Fair<String>()); final UniformFanOutShape<String, String> fair = b.graph(new Fair<String>());
b.edge(b.source(in), fair.in()); b.edge(b.source(in), fair.in());
@ -80,9 +80,9 @@ public class FlexiRouteTest {
out1, out1,
out2, out2,
Keep.<Future<List<String>>, Future<List<String>>> both(), Keep.<Future<List<String>>, Future<List<String>>> both(),
new Procedure3<Builder, SinkShape<List<String>>, SinkShape<List<String>>>() { new Procedure3<Builder<Pair<Future<List<String>>, Future<List<String>>>>, SinkShape<List<String>>, SinkShape<List<String>>>() {
@Override @Override
public void apply(Builder b, SinkShape<List<String>> o1, public void apply(Builder<Pair<Future<List<String>>, Future<List<String>>>> b, SinkShape<List<String>> o1,
SinkShape<List<String>> o2) throws Exception { SinkShape<List<String>> o2) throws Exception {
final UniformFanOutShape<String, String> robin = b.graph(new StrictRoundRobin<String>()); final UniformFanOutShape<String, String> robin = b.graph(new StrictRoundRobin<String>());
b.edge(b.source(in), robin.in()); b.edge(b.source(in), robin.in());
@ -112,9 +112,9 @@ public class FlexiRouteTest {
Sink.<List<Integer>> head(), Sink.<List<Integer>> head(),
out2, out2,
Keep.<Future<List<Integer>>, Future<List<String>>> both(), Keep.<Future<List<Integer>>, Future<List<String>>> both(),
new Procedure3<Builder, SinkShape<List<Integer>>, SinkShape<List<String>>>() { new Procedure3<Builder<Pair<Future<List<Integer>>, Future<List<String>>> >, SinkShape<List<Integer>>, SinkShape<List<String>>>() {
@Override @Override
public void apply(Builder b, SinkShape<List<Integer>> o1, public void apply(Builder<Pair<Future<List<Integer>>, Future<List<String>>> > b, SinkShape<List<Integer>> o1,
SinkShape<List<String>> o2) throws Exception { SinkShape<List<String>> o2) throws Exception {
final FanOutShape2<Pair<Integer, String>, Integer, String> unzip = b.graph(new Unzip<Integer, String>()); final FanOutShape2<Pair<Integer, String>, Integer, String> unzip = b.graph(new Unzip<Integer, String>());
final Outlet<Pair<Integer, String>> src = b.source(Source.from(pairs)); final Outlet<Pair<Integer, String>> src = b.source(Source.from(pairs));

View file

@ -16,6 +16,7 @@ import akka.stream.javadsl.japi.*;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -100,9 +101,9 @@ public class FlowGraphTest extends StreamTest {
final Sink<String, Publisher<String>> publisher = Sink.publisher(); final Sink<String, Publisher<String>> publisher = Sink.publisher();
final Source<String, BoxedUnit> source = Source.factory().create(new Function<FlowGraph.Builder, Outlet<String>>() { final Source<String, BoxedUnit> source = Source.factory().create(new Function<FlowGraph.Builder<BoxedUnit>, Outlet<String>>() {
@Override @Override
public Outlet<String> apply(Builder b) throws Exception { public Outlet<String> apply(Builder<BoxedUnit> b) throws Exception {
final UniformFanInShape<String, String> merge = b.graph(Merge.<String> create(2)); final UniformFanInShape<String, String> merge = b.graph(Merge.<String> create(2));
b.flow(b.source(in1), f1, merge.in(0)); b.flow(b.source(in1), f1, merge.in(0));
b.flow(b.source(in2), f2, merge.in(1)); b.flow(b.source(in2), f2, merge.in(1));
@ -124,7 +125,7 @@ public class FlowGraphTest extends StreamTest {
final Iterable<String> input1 = Arrays.asList("A", "B", "C"); final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<Integer> input2 = Arrays.asList(1, 2, 3); final Iterable<Integer> input2 = Arrays.asList(1, 2, 3);
final Builder b = FlowGraph.builder(); final Builder<BoxedUnit> b = FlowGraph.builder();
final Source<String, BoxedUnit> in1 = Source.from(input1); final Source<String, BoxedUnit> in1 = Source.from(input1);
final Source<Integer, BoxedUnit> in2 = Source.from(input2); final Source<Integer, BoxedUnit> in2 = Source.from(input2);
final FanInShape2<String, Integer, Pair<String,Integer>> zip = b.graph(Zip.<String, Integer>create()); final FanInShape2<String, Integer, Pair<String,Integer>> zip = b.graph(Zip.<String, Integer>create());
@ -160,7 +161,7 @@ public class FlowGraphTest extends StreamTest {
final Iterable<String> expected1 = Arrays.asList("A", "B", "C"); final Iterable<String> expected1 = Arrays.asList("A", "B", "C");
final Iterable<Integer> expected2 = Arrays.asList(1, 2, 3); final Iterable<Integer> expected2 = Arrays.asList(1, 2, 3);
final Builder b = FlowGraph.builder(); final Builder<BoxedUnit> b = FlowGraph.builder();
final Outlet<Pair<String, Integer>> in = b.source(Source.from(input)); final Outlet<Pair<String, Integer>> in = b.source(Source.from(input));
final FanOutShape2<Pair<String, Integer>, String, Integer> unzip = b.graph(Unzip.<String, Integer>create()); final FanOutShape2<Pair<String, Integer>, String, Integer> unzip = b.graph(Unzip.<String, Integer>create());
@ -200,9 +201,9 @@ public class FlowGraphTest extends StreamTest {
} }
}); });
final Future<Integer> future = FlowGraph.factory().closed(Sink.<Integer> head(), new Procedure2<Builder, SinkShape<Integer>>() { final Future<Integer> future = FlowGraph.factory().closed(Sink.<Integer> head(), new Procedure2<Builder<Future<Integer> >, SinkShape<Integer>>() {
@Override @Override
public void apply(Builder b, SinkShape<Integer> out) throws Exception { public void apply(Builder<Future<Integer> > b, SinkShape<Integer> out) throws Exception {
final FanInShape2<Integer, Integer, Integer> zip = b.graph(sumZip); final FanInShape2<Integer, Integer, Integer> zip = b.graph(sumZip);
b.edge(b.source(in1), zip.in0()); b.edge(b.source(in1), zip.in0());
b.edge(b.source(in2), zip.in1()); b.edge(b.source(in2), zip.in1());
@ -228,9 +229,9 @@ public class FlowGraphTest extends StreamTest {
} }
}); });
final Future<Integer> future = FlowGraph.factory().closed(Sink.<Integer> head(), new Procedure2<Builder, SinkShape<Integer>>() { final Future<Integer> future = FlowGraph.factory().closed(Sink.<Integer> head(), new Procedure2<Builder<Future<Integer>>, SinkShape<Integer>>() {
@Override @Override
public void apply(Builder b, SinkShape<Integer> out) throws Exception { public void apply(Builder<Future<Integer>> b, SinkShape<Integer> out) throws Exception {
final FanInShape4<Integer, Integer, Integer, Integer, Integer> zip = b.graph(sumZip); final FanInShape4<Integer, Integer, Integer, Integer, Integer> zip = b.graph(sumZip);
b.edge(b.source(in1), zip.in0()); b.edge(b.source(in1), zip.in0());
b.edge(b.source(in2), zip.in1()); b.edge(b.source(in2), zip.in1());
@ -244,4 +245,30 @@ public class FlowGraphTest extends StreamTest {
assertEquals(1111, (int) result); assertEquals(1111, (int) result);
} }
@Test
public void mustBeAbleToUseMatValue() throws Exception {
final Source<Integer, BoxedUnit> in1 = Source.single(1);
final TestProbe probe = TestProbe.apply(system);
final Future<Integer> future = FlowGraph.factory().closed(Sink.<Integer> head(), new Procedure2<Builder<Future<Integer>>, SinkShape<Integer>>() {
@Override
public void apply(Builder<Future<Integer>> b, SinkShape<Integer> out) throws Exception {
b.from(Source.single(1)).to(out);
b.from(b.matValue()).to(Sink.foreach(new Procedure<Future<Integer>>(){
public void apply(Future<Integer> mat) throws Exception {
probe.ref().tell(mat, ActorRef.noSender());
}
}));
}
}).run(materializer);
final Integer result = Await.result(future, Duration.create(300, TimeUnit.MILLISECONDS));
assertEquals(1, (int) result);
final Future<Integer> future2 = probe.expectMsgClass(Future.class);
final Integer result2 = Await.result(future2, Duration.create(300, TimeUnit.MILLISECONDS));
assertEquals(1, (int) result2);
}
} }

View file

@ -280,9 +280,9 @@ public class FlowTest extends StreamTest {
final Sink<String, Publisher<String>> publisher = Sink.publisher(); final Sink<String, Publisher<String>> publisher = Sink.publisher();
final Source<String, BoxedUnit> source = Source.factory().create(new Function<FlowGraph.Builder, Outlet<String>>() { final Source<String, BoxedUnit> source = Source.factory().create(new Function<FlowGraph.Builder<BoxedUnit>, Outlet<String>>() {
@Override @Override
public Outlet<String> apply(Builder b) throws Exception { public Outlet<String> apply(Builder<BoxedUnit> b) throws Exception {
final UniformFanInShape<String, String> merge = b.graph(Merge.<String> create(2)); final UniformFanInShape<String, String> merge = b.graph(Merge.<String> create(2));
b.flow(b.source(in1), f1, merge.in(0)); b.flow(b.source(in1), f1, merge.in(0));
b.flow(b.source(in2), f2, merge.in(1)); b.flow(b.source(in2), f2, merge.in(1));
@ -304,7 +304,7 @@ public class FlowTest extends StreamTest {
final Iterable<String> input1 = Arrays.asList("A", "B", "C"); final Iterable<String> input1 = Arrays.asList("A", "B", "C");
final Iterable<Integer> input2 = Arrays.asList(1, 2, 3); final Iterable<Integer> input2 = Arrays.asList(1, 2, 3);
final Builder b = FlowGraph.builder(); final Builder<BoxedUnit> b = FlowGraph.<BoxedUnit>builder();
final Outlet<String> in1 = b.source(Source.from(input1)); final Outlet<String> in1 = b.source(Source.from(input1));
final Outlet<Integer> in2 = b.source(Source.from(input2)); final Outlet<Integer> in2 = b.source(Source.from(input2));
final FanInShape2<String, Integer, Pair<String, Integer>> zip = b.graph(Zip.<String, Integer> create()); final FanInShape2<String, Integer, Pair<String, Integer>> zip = b.graph(Zip.<String, Integer> create());

View file

@ -14,7 +14,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
override type Outputs = Int override type Outputs = Int
override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) {
val concat = b add Concat[Outputs]() val concat = b add Concat[Outputs]()
override def left: Inlet[Outputs] = concat.in(0) override def left: Inlet[Outputs] = concat.in(0)

View file

@ -0,0 +1,110 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
class GraphMatValueSpec extends AkkaSpec {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorFlowMaterializer(settings)
import FlowGraph.Implicits._
"A Graph with materialized value" must {
val foldSink = Sink.fold[Int, Int](0)(_ + _)
"expose the materialized value as source" in {
val sub = StreamTestKit.SubscriberProbe[Int]()
val f = FlowGraph.closed(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
b.matValue.mapAsync(identity) ~> Sink(sub)
}.run()
val r1 = Await.result(f, 3.seconds)
sub.expectSubscription().request(1)
val r2 = sub.expectNext()
r1 should ===(r2)
}
"expose the materialized value as source multiple times" in {
val sub = StreamTestKit.SubscriberProbe[Int]()
val f = FlowGraph.closed(foldSink) { implicit b
fold
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
Source(1 to 10) ~> fold
b.matValue.mapAsync(identity) ~> zip.in0
b.matValue.mapAsync(identity) ~> zip.in1
zip.out ~> Sink(sub)
}.run()
val r1 = Await.result(f, 3.seconds)
sub.expectSubscription().request(1)
val r2 = sub.expectNext()
r1 should ===(r2 / 2)
}
// Exposes the materialized value as a stream value
val foldFeedbackSource: Source[Future[Int], Future[Int]] = Source(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
b.matValue
}
"allow exposing the materialized value as port" in {
val (f1, f2) = foldFeedbackSource.mapAsync(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run()
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(155)
}
"allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in {
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(identity).map(_ + 100).mapMaterialized((_) ())
Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155)
}
"work properly with nesting and reusing" in {
val compositeSource1 = Source(foldFeedbackSource, foldFeedbackSource)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.outlet.mapAsync(identity) ~> zip.in0
s2.outlet.mapAsync(identity).map(_ * 100) ~> zip.in1
zip.out
}
val compositeSource2 = Source(compositeSource1, compositeSource1)(Keep.both) { implicit b
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.outlet ~> zip.in0
s2.outlet.map(_ * 10000) ~> zip.in1
zip.out
}
val (((f1, f2), (f3, f4)), result) = compositeSource2.toMat(Sink.head)(Keep.both).run()
Await.result(result, 3.seconds) should ===(55555555)
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(55)
Await.result(f3, 3.seconds) should ===(55)
Await.result(f4, 3.seconds) should ===(55)
}
}
}

View file

@ -14,7 +14,7 @@ class GraphMergeSpec extends TwoStreamsSetup {
override type Outputs = Int override type Outputs = Int
override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) {
val merge = b add Merge[Outputs](2) val merge = b add Merge[Outputs](2)
override def left: Inlet[Outputs] = merge.in(0) override def left: Inlet[Outputs] = merge.in(0)

View file

@ -14,7 +14,7 @@ class GraphPreferredMergeSpec extends TwoStreamsSetup {
override type Outputs = Int override type Outputs = Int
override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) {
val merge = b.add(MergePreferred[Outputs](1)) val merge = b.add(MergePreferred[Outputs](1))
override def left: Inlet[Outputs] = merge.preferred override def left: Inlet[Outputs] = merge.preferred

View file

@ -12,7 +12,7 @@ class GraphZipSpec extends TwoStreamsSetup {
override type Outputs = (Int, Int) override type Outputs = (Int, Int)
override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) {
val zip = b.add(Zip[Int, Int]()) val zip = b.add(Zip[Int, Int]())
override def left: Inlet[Int] = zip.in0 override def left: Inlet[Int] = zip.in0

View file

@ -10,7 +10,7 @@ class GraphZipWithSpec extends TwoStreamsSetup {
override type Outputs = Int override type Outputs = Int
override def fixture(b: FlowGraph.Builder): Fixture = new Fixture(b: FlowGraph.Builder) { override def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) {
val zip = b.add(ZipWith((_: Int) + (_: Int))) val zip = b.add(ZipWith((_: Int) + (_: Int)))
override def left: Inlet[Int] = zip.in0 override def left: Inlet[Int] = zip.in0
override def right: Inlet[Int] = zip.in1 override def right: Inlet[Int] = zip.in1

View file

@ -13,14 +13,14 @@ trait BidiFlowCreate {
import language.implicitConversions import language.implicitConversions
private implicit def p[A, B](pair: Pair[A, B]): (A, B) = pair.first -> pair.second private implicit def p[A, B](pair: Pair[A, B]): (A, B) = pair.first -> pair.second
def create[I1, O1, I2, O2](block: japi.Function[FlowGraph.Builder, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] = def create[I1, O1, I2, O2](block: japi.Function[FlowGraph.Builder[Unit], BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, Unit] =
new BidiFlow(scaladsl.BidiFlow() { b ⇒ block.apply(b.asJava) }) new BidiFlow(scaladsl.BidiFlow() { b ⇒ block.apply(b.asJava) })
def create[I1, O1, I2, O2, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] = def create[I1, O1, I2, O2, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, BidiShape[I1, O1, I2, O2]]): BidiFlow[I1, O1, I2, O2, M] =
new BidiFlow(scaladsl.BidiFlow(g1) { b ⇒ s => block.apply(b.asJava, s) }) new BidiFlow(scaladsl.BidiFlow(g1) { b ⇒ s => block.apply(b.asJava, s) })
[2..21#def create[I##1, O##1, I##2, O##2, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], [2..21#def create[I##1, O##1, I##2, O##2, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M],
block: japi.Function2[FlowGraph.Builder, [#S1#], BidiShape[I##1, O##1, I##2, O##2]]): BidiFlow[I##1, O##1, I##2, O##2, M] = block: japi.Function2[FlowGraph.Builder[M], [#S1#], BidiShape[I##1, O##1, I##2, O##2]]): BidiFlow[I##1, O##1, I##2, O##2, M] =
new BidiFlow(scaladsl.BidiFlow([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# new BidiFlow(scaladsl.BidiFlow([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })#
] ]

View file

@ -13,14 +13,14 @@ trait FlowCreate {
import language.implicitConversions import language.implicitConversions
private implicit def p[A, B](pair: Pair[A, B]): (A, B) = pair.first -> pair.second private implicit def p[A, B](pair: Pair[A, B]): (A, B) = pair.first -> pair.second
def create[I, O](block: japi.Function[FlowGraph.Builder, Inlet[I] Pair Outlet[O]]): Flow[I, O, Unit] = def create[I, O](block: japi.Function[FlowGraph.Builder[Unit], Inlet[I] Pair Outlet[O]]): Flow[I, O, Unit] =
new Flow(scaladsl.Flow() { b ⇒ block.apply(b.asJava) }) new Flow(scaladsl.Flow() { b ⇒ block.apply(b.asJava) })
def create[I, O, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = def create[I, O, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, Inlet[I] Pair Outlet[O]]): Flow[I, O, M] =
new Flow(scaladsl.Flow(g1) { b ⇒ s => block.apply(b.asJava, s) }) new Flow(scaladsl.Flow(g1) { b ⇒ s => block.apply(b.asJava, s) })
[2..21#def create[I, O, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], [2..21#def create[I, O, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M],
block: japi.Function2[FlowGraph.Builder, [#S1#], Inlet[I] Pair Outlet[O]]): Flow[I, O, M] = block: japi.Function2[FlowGraph.Builder[M], [#S1#], Inlet[I] Pair Outlet[O]]): Flow[I, O, M] =
new Flow(scaladsl.Flow([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# new Flow(scaladsl.Flow([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })#
] ]

View file

@ -12,24 +12,24 @@ trait GraphCreate {
import language.implicitConversions import language.implicitConversions
private implicit def r[M](run: scaladsl.RunnableFlow[M]): RunnableFlow[M] = new RunnableFlowAdapter(run) private implicit def r[M](run: scaladsl.RunnableFlow[M]): RunnableFlow[M] = new RunnableFlowAdapter(run)
def closed(block: japi.Procedure[FlowGraph.Builder]): RunnableFlow[Unit] = def closed(block: japi.Procedure[FlowGraph.Builder[Unit]]): RunnableFlow[Unit] =
scaladsl.FlowGraph.closed() { b ⇒ block.apply(b.asJava) } scaladsl.FlowGraph.closed() { b ⇒ block.apply(b.asJava) }
def partial[S <: Shape](block: japi.Function[FlowGraph.Builder, S]): Graph[S, Unit] = def partial[S <: Shape](block: japi.Function[FlowGraph.Builder[Unit], S]): Graph[S, Unit] =
scaladsl.FlowGraph.partial() { b ⇒ block.apply(b.asJava) } scaladsl.FlowGraph.partial() { b ⇒ block.apply(b.asJava) }
def closed[S1 <: Shape, M](g1: Graph[S1, M], block: japi.Procedure2[FlowGraph.Builder, S1]): RunnableFlow[M] = def closed[S1 <: Shape, M](g1: Graph[S1, M], block: japi.Procedure2[FlowGraph.Builder[M], S1]): RunnableFlow[M] =
scaladsl.FlowGraph.closed(g1) { b ⇒ s => block.apply(b.asJava, s) } scaladsl.FlowGraph.closed(g1) { b ⇒ s => block.apply(b.asJava, s) }
def partial[S1 <: Shape, S <: Shape, M](g1: Graph[S1, M], block: japi.Function2[FlowGraph.Builder, S1, S]): Graph[S, M] = def partial[S1 <: Shape, S <: Shape, M](g1: Graph[S1, M], block: japi.Function2[FlowGraph.Builder[M], S1, S]): Graph[S, M] =
scaladsl.FlowGraph.partial(g1) { b ⇒ s => block.apply(b.asJava, s) } scaladsl.FlowGraph.partial(g1) { b ⇒ s => block.apply(b.asJava, s) }
[2..21#def closed[[#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], [2..21#def closed[[#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M],
block: japi.Procedure2[FlowGraph.Builder, [#S1#]]): RunnableFlow[M] = block: japi.Procedure2[FlowGraph.Builder[M], [#S1#]]): RunnableFlow[M] =
scaladsl.FlowGraph.closed([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) } scaladsl.FlowGraph.closed([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }
def partial[[#S1 <: Shape#], S <: Shape, [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], def partial[[#S1 <: Shape#], S <: Shape, [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M],
block: japi.Function2[FlowGraph.Builder, [#S1#], S]): Graph[S, M] = block: japi.Function2[FlowGraph.Builder[M], [#S1#], S]): Graph[S, M] =
scaladsl.FlowGraph.partial([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }# scaladsl.FlowGraph.partial([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }#
] ]

View file

@ -13,18 +13,18 @@ trait SinkCreate {
* Creates a `Sink` by using a `FlowGraph.Builder` on a block that expects * Creates a `Sink` by using a `FlowGraph.Builder` on a block that expects
* a [[FlowGraph.Builder]] and returns the `UndefinedSource`. * a [[FlowGraph.Builder]] and returns the `UndefinedSource`.
*/ */
def create[T](block: japi.Function[FlowGraph.Builder, Inlet[T]]): Sink[T, Unit] = def create[T](block: japi.Function[FlowGraph.Builder[Unit], Inlet[T]]): Sink[T, Unit] =
new Sink(scaladsl.Sink() { b ⇒ block.apply(b.asJava) }) new Sink(scaladsl.Sink() { b ⇒ block.apply(b.asJava) })
/** /**
* Creates a `Sink` by using a `FlowGraph.Builder` on a block that expects * Creates a `Sink` by using a `FlowGraph.Builder` on a block that expects
* a [[FlowGraph.Builder]] and returns the `UndefinedSource`. * a [[FlowGraph.Builder]] and returns the `UndefinedSource`.
*/ */
def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Inlet[T]]): Sink[T, M] = def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, Inlet[T]]): Sink[T, M] =
new Sink(scaladsl.Sink(g1) { b ⇒ s => block.apply(b.asJava, s) }) new Sink(scaladsl.Sink(g1) { b ⇒ s => block.apply(b.asJava, s) })
[2..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], [2..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M],
block: japi.Function2[FlowGraph.Builder, [#S1#], Inlet[T]]): Sink[T, M] = block: japi.Function2[FlowGraph.Builder[M], [#S1#], Inlet[T]]): Sink[T, M] =
new Sink(scaladsl.Sink([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# new Sink(scaladsl.Sink([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })#
] ]

View file

@ -9,14 +9,14 @@ import akka.stream.scaladsl.JavaConverters._
trait SourceCreate { trait SourceCreate {
def create[T](block: japi.Function[FlowGraph.Builder, Outlet[T]]): Source[T, Unit] = def create[T](block: japi.Function[FlowGraph.Builder[Unit], Outlet[T]]): Source[T, Unit] =
new Source(scaladsl.Source() { b ⇒ block.apply(b.asJava) }) new Source(scaladsl.Source() { b ⇒ block.apply(b.asJava) })
def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder, S, Outlet[T]]): Source[T, M] = def create[T, S <: Shape, M](g1: Graph[S, M], block: japi.Function2[FlowGraph.Builder[M], S, Outlet[T]]): Source[T, M] =
new Source(scaladsl.Source(g1) { b ⇒ s => block.apply(b.asJava, s) }) new Source(scaladsl.Source(g1) { b ⇒ s => block.apply(b.asJava, s) })
[2..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M], [2..21#def create[T, [#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: japi.Function1[[#M1#], M],
block: japi.Function2[FlowGraph.Builder, [#S1#], Outlet[T]]): Source[T, M] = block: japi.Function2[FlowGraph.Builder[M], [#S1#], Outlet[T]]): Source[T, M] =
new Source(scaladsl.Source([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })# new Source(scaladsl.Source([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) })#
] ]

View file

@ -7,13 +7,13 @@ import akka.stream.{ Shape, Inlet, Outlet, Graph, BidiShape }
trait BidiFlowApply { trait BidiFlowApply {
def apply[I1, O1, I2, O2]()(block: FlowGraph.Builder ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = { def apply[I1, O1, I2, O2]()(block: FlowGraph.Builder[Unit] ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Unit] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val shape = block(builder) val shape = block(builder)
builder.buildBidiFlow(shape) builder.buildBidiFlow(shape)
} }
def apply[I1, O1, I2, O2, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { def apply[I1, O1, I2, O2, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) ⇒ BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val p = builder.add(g1, Keep.right) val p = builder.add(g1, Keep.right)
val shape = buildBlock(builder)(p) val shape = buildBlock(builder)(p)
@ -21,7 +21,7 @@ trait BidiFlowApply {
} }
[2..#def apply[I##1, O##1, I##2, O##2, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)( [2..#def apply[I##1, O##1, I##2, O##2, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)(
buildBlock: FlowGraph.Builder => ([#g1.Shape#]) ⇒ BidiShape[I##1, O##1, I##2, O##2]): BidiFlow[I##1, O##1, I##2, O##2, Mat] = { buildBlock: FlowGraph.Builder[Mat] => ([#g1.Shape#]) ⇒ BidiShape[I##1, O##1, I##2, O##2]): BidiFlow[I##1, O##1, I##2, O##2, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val curried = combineMat.curried val curried = combineMat.curried
val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))

View file

@ -7,13 +7,13 @@ import akka.stream.{ Shape, Inlet, Outlet, Graph }
trait FlowApply { trait FlowApply {
def apply[I, O]()(block: FlowGraph.Builder ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit] = { def apply[I, O]()(block: FlowGraph.Builder[Unit] ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Unit] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val (inlet, outlet) = block(builder) val (inlet, outlet) = block(builder)
builder.buildFlow(inlet, outlet) builder.buildFlow(inlet, outlet)
} }
def apply[I, O, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = { def apply[I, O, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val p = builder.add(g1, Keep.right) val p = builder.add(g1, Keep.right)
val (inlet, outlet) = buildBlock(builder)(p) val (inlet, outlet) = buildBlock(builder)(p)
@ -21,7 +21,7 @@ trait FlowApply {
} }
[2..#def apply[I, O, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)( [2..#def apply[I, O, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)(
buildBlock: FlowGraph.Builder => ([#g1.Shape#]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = { buildBlock: FlowGraph.Builder[Mat] => ([#g1.Shape#]) ⇒ (Inlet[I], Outlet[O])): Flow[I, O, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val curried = combineMat.curried val curried = combineMat.curried
val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))

View file

@ -8,20 +8,20 @@ import akka.stream.{ Graph, Shape }
trait GraphApply { trait GraphApply {
def closed()(buildBlock: (FlowGraph.Builder) ⇒ Unit): RunnableFlow[Unit] = { def closed()(buildBlock: (FlowGraph.Builder[Unit]) ⇒ Unit): RunnableFlow[Unit] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
buildBlock(builder) buildBlock(builder)
builder.buildRunnable() builder.buildRunnable()
} }
def closed[Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder ⇒ (g1.Shape) ⇒ Unit): RunnableFlow[Mat] = { def closed[Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape) ⇒ Unit): RunnableFlow[Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val p1 = builder.add(g1) val p1 = builder.add(g1)
buildBlock(builder)(p1) buildBlock(builder)(p1)
builder.buildRunnable() builder.buildRunnable()
} }
def partial[S <: Shape]()(buildBlock: FlowGraph.Builder ⇒ S): Graph[S, Unit] = { def partial[S <: Shape]()(buildBlock: FlowGraph.Builder[Unit] ⇒ S): Graph[S, Unit] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val s = buildBlock(builder) val s = buildBlock(builder)
val mod = builder.module.wrap().replaceShape(s) val mod = builder.module.wrap().replaceShape(s)
@ -32,7 +32,7 @@ trait GraphApply {
} }
} }
def partial[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder ⇒ (g1.Shape) ⇒ S): Graph[S, Mat] = { def partial[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape) ⇒ S): Graph[S, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val s1 = builder.add(g1) val s1 = builder.add(g1)
val s = buildBlock(builder)(s1) val s = buildBlock(builder)(s1)
@ -46,7 +46,7 @@ trait GraphApply {
[2..#def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableFlow[Mat] = { [2..#def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableFlow[Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val curried = combineMat.curried val curried = combineMat.curried
val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))
@ -58,7 +58,7 @@ trait GraphApply {
] ]
[2..#def partial[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder ⇒ ([#g1.Shape#]) ⇒ S): Graph[S, Mat] = { [2..#def partial[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ S): Graph[S, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val curried = combineMat.curried val curried = combineMat.curried
val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))

View file

@ -7,13 +7,13 @@ import akka.stream.{ Inlet, Graph, Shape }
trait SinkApply { trait SinkApply {
def apply[In]()(buildBlock: FlowGraph.Builder => Inlet[In]): Sink[In, Unit] = { def apply[In]()(buildBlock: FlowGraph.Builder[Unit] => Inlet[In]): Sink[In, Unit] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val inlet = buildBlock(builder) val inlet = buildBlock(builder)
builder.buildSink(inlet) builder.buildSink(inlet)
} }
def apply[In, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) => Inlet[In]): Sink[In, Mat] = { def apply[In, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) => Inlet[In]): Sink[In, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val s = builder.add(g1, Keep.right) val s = builder.add(g1, Keep.right)
val inlet = buildBlock(builder)(s) val inlet = buildBlock(builder)(s)
@ -21,7 +21,7 @@ trait SinkApply {
} }
[2..#def apply[In, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)( [2..#def apply[In, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(
buildBlock: FlowGraph.Builder ⇒ ([#g1.Shape#]) ⇒ Inlet[In]): Sink[In, Mat] = { buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Inlet[In]): Sink[In, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val curried = combineMat.curried val curried = combineMat.curried
val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) val s##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))

View file

@ -7,13 +7,13 @@ import akka.stream.{ Outlet, Shape, Graph }
trait SourceApply { trait SourceApply {
def apply[Out]()(buildBlock: FlowGraph.Builder => Outlet[Out]): Source[Out, Unit] = { def apply[Out]()(buildBlock: FlowGraph.Builder[Unit] => Outlet[Out]): Source[Out, Unit] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val port = buildBlock(builder) val port = buildBlock(builder)
builder.buildSource(port) builder.buildSource(port)
} }
def apply[Out, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder => (g1.Shape) => Outlet[Out]): Source[Out, Mat] = { def apply[Out, Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] => (g1.Shape) => Outlet[Out]): Source[Out, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val p = builder.add(g1, Keep.right) val p = builder.add(g1, Keep.right)
val port = buildBlock(builder)(p) val port = buildBlock(builder)(p)
@ -21,7 +21,7 @@ trait SourceApply {
} }
[2..#def apply[Out, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)( [2..#def apply[Out, [#M1#], Mat]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(
buildBlock: FlowGraph.Builder ⇒ ([#g1.Shape#]) ⇒ Outlet[Out]): Source[Out, Mat] = { buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Outlet[Out]): Source[Out, Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val curried = combineMat.curried val curried = combineMat.curried
val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1)) val p##1 = builder.add(g##1, (_: Any, m##1: M##1) ⇒ curried(m##1))

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl package akka.stream.impl
import scala.util.control.NonFatal import scala.util.control.NonFatal

View file

@ -3,11 +3,15 @@
*/ */
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference }
import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.{ Keep, OperationAttributes } import akka.stream.scaladsl.{ Keep, OperationAttributes }
import akka.stream._ import akka.stream._
import org.reactivestreams.{ Subscription, Publisher, Subscriber } import org.reactivestreams.{ Subscription, Publisher, Subscriber }
import akka.event.Logging.simpleName import akka.event.Logging.simpleName
import scala.collection.mutable import scala.collection.mutable
import scala.util.control.NonFatal
/** /**
* INTERNAL API * INTERNAL API
@ -310,6 +314,148 @@ private[stream] class VirtualPublisher[T]() extends Publisher[T] {
override def subscribe(s: Subscriber[_ >: T]): Unit = realPublisher.subscribe(s) override def subscribe(s: Subscriber[_ >: T]): Unit = realPublisher.subscribe(s)
} }
/**
* INTERNAL API
*/
private[stream] case class MaterializedValueSource[M](
shape: SourceShape[M] = SourceShape[M](new Outlet[M]("Materialized.out")),
attributes: OperationAttributes = OperationAttributes.name("Materialized")) extends StreamLayout.Module {
override def subModules: Set[Module] = Set.empty
override def withAttributes(attr: OperationAttributes): Module = this.copy(shape = amendShape(attr), attributes = attr)
override def carbonCopy: Module = this.copy(shape = SourceShape(new Outlet[M]("Materialized.out")))
override def replaceShape(s: Shape): Module =
if (s == shape) this
else throw new UnsupportedOperationException("cannot replace the shape of MaterializedValueSource")
def amendShape(attr: OperationAttributes): SourceShape[M] = {
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(outlet = new Outlet(name + ".out"))
}
}
}
/**
* INTERNAL API
*/
private[stream] object MaterializedValuePublisher {
final val NotRequested = 0
final val Requested = 1
final val Completed = 2
final val NoValue = new AnyRef
}
/**
* INTERNAL API
*/
private[stream] class MaterializedValuePublisher extends Publisher[Any] {
import MaterializedValuePublisher._
private val value = new AtomicReference[AnyRef](NoValue)
private val registeredSubscriber = new AtomicReference[Subscriber[_ >: Any]](null)
private val requestState = new AtomicInteger(NotRequested)
private def close(): Unit = {
requestState.set(Completed)
value.set(NoValue)
registeredSubscriber.set(null)
}
private def tryOrClose(block: Unit): Unit = {
try block catch {
case v: ReactiveStreamsCompliance.SpecViolation
close()
// What else can we do here?
case NonFatal(e)
val sub = registeredSubscriber.get()
if ((sub ne null) &&
requestState.compareAndSet(NotRequested, Completed) || requestState.compareAndSet(Requested, Completed)) {
sub.onError(e)
}
close()
throw e
}
}
def setValue(m: Any): Unit =
tryOrClose {
if (value.compareAndSet(NoValue, m.asInstanceOf[AnyRef]) && requestState.get() == Requested)
pushAndClose(m)
}
/*
* Both call-sites do a CAS on their "own" side and a GET on the other side. The possible overlaps
* are (removing symmetric cases where you can relabel A->B, B->A):
*
* A-CAS
* A-GET
* B-CAS
* B-GET - pushAndClose fires here
*
* A-CAS
* B-CAS
* A-GET - pushAndClose fires here
* B-GET - pushAndClose fires here
*
* A-CAS
* B-CAS
* B-GET - pushAndClose fires here
* A-GET - pushAndClose fires here
*
* The proof that there are no cases:
*
* - all permutations of 4 operations are 4! = 24
* - the operations of A and B are cannot be reordered, so there are 24 / (2 * 2) = 6 actual orderings
* - if we don't count cases which are a simple relabeling A->B, B->A, we get 6 / 2 = 3 reorderings
* which are all enumerated above.
*
* pushAndClose protects against double onNext by doing a CAS itself.
*/
private def pushAndClose(m: Any): Unit = {
if (requestState.compareAndSet(Requested, Completed)) {
val sub = registeredSubscriber.get()
ReactiveStreamsCompliance.tryOnNext(sub, m)
ReactiveStreamsCompliance.tryOnComplete(sub)
close()
}
}
override def subscribe(subscriber: Subscriber[_ >: Any]): Unit = {
tryOrClose {
ReactiveStreamsCompliance.requireNonNullSubscriber(subscriber)
if (registeredSubscriber.compareAndSet(null, subscriber)) {
ReactiveStreamsCompliance.tryOnSubscribe(subscriber, new Subscription {
override def cancel(): Unit = close()
override def request(n: Long): Unit = {
if (n <= 0) {
ReactiveStreamsCompliance.tryOnError(
subscriber,
ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
} else {
if (requestState.compareAndSet(NotRequested, Requested)) {
val m = value.get()
if (m ne NoValue) pushAndClose(m)
}
}
}
})
} else {
if (subscriber == registeredSubscriber.get())
ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber)
else
ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "MaterializedValuePublisher")
}
}
}
}
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -332,12 +478,25 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def materializeModule(module: Module, effectiveAttributes: OperationAttributes): Any = { protected def materializeModule(module: Module, effectiveAttributes: OperationAttributes): Any = {
val materializedValues = collection.mutable.HashMap.empty[Module, Any] val materializedValues = collection.mutable.HashMap.empty[Module, Any]
var materializedValuePublishers: List[MaterializedValuePublisher] = Nil
for (submodule module.subModules) { for (submodule module.subModules) {
val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes) val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes)
if (submodule.isAtomic) materializedValues.put(submodule, materializeAtomic(submodule, subEffectiveAttributes)) submodule match {
else materializedValues.put(submodule, materializeComposite(submodule, subEffectiveAttributes)) case mv: MaterializedValueSource[_]
val pub = new MaterializedValuePublisher
materializedValuePublishers ::= pub
assignPort(mv.shape.outlet, pub)
case atomic if atomic.isAtomic
materializedValues.put(atomic, materializeAtomic(atomic, subEffectiveAttributes))
case composite
materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes))
} }
resolveMaterialized(module.materializedValueComputation, materializedValues) }
val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
materializedValuePublishers foreach { pub pub.setValue(mat) }
mat
} }
protected def materializeComposite(composite: Module, effectiveAttributes: OperationAttributes): Any = { protected def materializeComposite(composite: Module, effectiveAttributes: OperationAttributes): Any = {

View file

@ -4,7 +4,6 @@
package akka.stream.javadsl package akka.stream.javadsl
import akka.stream._ import akka.stream._
import akka.stream.scaladsl
import akka.japi.Pair import akka.japi.Pair
/** /**
@ -278,10 +277,10 @@ object FlowGraph {
* The [[Builder]] is mutable and not thread-safe, * The [[Builder]] is mutable and not thread-safe,
* thus you should construct your Graph and then share the constructed immutable [[FlowGraph]]. * thus you should construct your Graph and then share the constructed immutable [[FlowGraph]].
*/ */
def builder(): Builder = new Builder()(new scaladsl.FlowGraph.Builder) def builder[M](): Builder[M] = new Builder()(new scaladsl.FlowGraph.Builder[M])
class Builder()(private implicit val delegate: scaladsl.FlowGraph.Builder) { self final class Builder[Mat]()(private implicit val delegate: scaladsl.FlowGraph.Builder[Mat]) { self
import scaladsl.FlowGraph.Implicits._ import akka.stream.scaladsl.FlowGraph.Implicits._
def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to) def flow[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = delegate.addEdge(from, via.asScala, to)
@ -298,6 +297,22 @@ object FlowGraph {
def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala) def sink[T](sink: Sink[T, _]): Inlet[T] = delegate.add(sink.asScala)
/**
* Returns an [[Outlet]] that gives access to the materialized value of this graph. Once the graph is materialized
* this outlet will emit exactly one element which is the materialized value. It is possible to expose this
* outlet as an externally accessible outlet of a [[Source]], [[Sink]], [[Flow]] or [[BidiFlow]].
*
* It is possible to call this method multiple times to get multiple [[Outlet]] instances if necessary. All of
* the outlets will emit the materialized value.
*
* Be careful to not to feed the result of this outlet to a stage that produces the materialized value itself (for
* example to a [[Sink#fold]] that contributes to the materialized value) since that might lead to an unresolvable
* dependency cycle.
*
* @return The outlet that will emit the materialized value.
*/
def matValue: Outlet[Mat] = delegate.matValue
def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat) def run(mat: FlowMaterializer): Unit = delegate.buildRunnable().run()(mat)
def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out) def from[T](out: Outlet[T]): ForwardOps[T] = new ForwardOps(out)
@ -314,26 +329,27 @@ object FlowGraph {
def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0)) def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0))
def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in) def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in)
class ForwardOps[T](out: Outlet[T]) { final class ForwardOps[T](out: Outlet[T]) {
def to(in: Inlet[T]): Builder = { out ~> in; self } def to(in: Inlet[T]): Builder[Mat] = { out ~> in; self }
def to[M](dst: Sink[T, M]): Builder = { out ~> dst.asScala; self } def to[M](dst: Sink[T, M]): Builder[Mat] = { out ~> dst.asScala; self }
def to(dst: SinkShape[T]): Builder = { out ~> dst; self } def to(dst: SinkShape[T]): Builder[Mat] = { out ~> dst; self }
def to[U](f: FlowShape[T, U]): Builder = { out ~> f; self } def to[U](f: FlowShape[T, U]): Builder[Mat] = { out ~> f; self }
def to[U](j: UniformFanInShape[T, U]): Builder = { out ~> j; self } def to[U](j: UniformFanInShape[T, U]): Builder[Mat] = { out ~> j; self }
def to[U](j: UniformFanOutShape[T, U]): Builder = { out ~> j; self } def to[U](j: UniformFanOutShape[T, U]): Builder[Mat] = { out ~> j; self }
def via[U, M](f: Flow[T, U, M]): ForwardOps[U] = from((out ~> f.asScala).outlet) def via[U, M](f: Flow[T, U, M]): ForwardOps[U] = from((out ~> f.asScala).outlet)
def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet) def via[U](f: FlowShape[T, U]): ForwardOps[U] = from((out ~> f).outlet)
def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) def via[U](j: UniformFanInShape[T, U]): ForwardOps[U] = from((out ~> j).outlet)
def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet) def via[U](j: UniformFanOutShape[T, U]): ForwardOps[U] = from((out ~> j).outlet)
def out(): Outlet[T] = out
} }
class ReverseOps[T](out: Inlet[T]) { final class ReverseOps[T](out: Inlet[T]) {
def from(dst: Outlet[T]): Builder = { out <~ dst; self } def from(dst: Outlet[T]): Builder[Mat] = { out <~ dst; self }
def from[M](dst: Source[T, M]): Builder = { out <~ dst.asScala; self } def from[M](dst: Source[T, M]): Builder[Mat] = { out <~ dst.asScala; self }
def from(dst: SourceShape[T]): Builder = { out <~ dst; self } def from(dst: SourceShape[T]): Builder[Mat] = { out <~ dst; self }
def from[U](f: FlowShape[U, T]): Builder = { out <~ f; self } def from[U](f: FlowShape[U, T]): Builder[Mat] = { out <~ f; self }
def from[U](j: UniformFanInShape[U, T]): Builder = { out <~ j; self } def from[U](j: UniformFanInShape[U, T]): Builder[Mat] = { out <~ j; self }
def from[U](j: UniformFanOutShape[U, T]): Builder = { out <~ j; self } def from[U](j: UniformFanOutShape[U, T]): Builder[Mat] = { out <~ j; self }
def via[U, M](f: Flow[U, T, M]): ReverseOps[U] = to((out <~ f.asScala).inlet) def via[U, M](f: Flow[U, T, M]): ReverseOps[U] = to((out <~ f.asScala).inlet)
def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet) def via[U](f: FlowShape[U, T]): ReverseOps[U] = to((out <~ f).inlet)
def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet) def via[U](j: UniformFanInShape[U, T]): ReverseOps[U] = to((out <~ j).inlet)

View file

@ -171,10 +171,10 @@ object Concat {
object FlowGraph extends GraphApply { object FlowGraph extends GraphApply {
class Builder private[stream] () { class Builder[+M] private[stream] () {
private var moduleInProgress: Module = EmptyModule private var moduleInProgress: Module = EmptyModule
def addEdge[A, B, M](from: Outlet[A], via: Flow[A, B, M], to: Inlet[B]): Unit = { def addEdge[A, B, M2](from: Outlet[A], via: Flow[A, B, M2], to: Inlet[B]): Unit = {
val flowCopy = via.module.carbonCopy val flowCopy = via.module.carbonCopy
moduleInProgress = moduleInProgress =
moduleInProgress moduleInProgress
@ -215,6 +215,26 @@ object FlowGraph extends GraphApply {
def add[T](s: Source[T, _]): Outlet[T] = add(s: Graph[SourceShape[T], _]).outlet def add[T](s: Source[T, _]): Outlet[T] = add(s: Graph[SourceShape[T], _]).outlet
def add[T](s: Sink[T, _]): Inlet[T] = add(s: Graph[SinkShape[T], _]).inlet def add[T](s: Sink[T, _]): Inlet[T] = add(s: Graph[SinkShape[T], _]).inlet
/**
* Returns an [[Outlet]] that gives access to the materialized value of this graph. Once the graph is materialized
* this outlet will emit exactly one element which is the materialized value. It is possible to expose this
* outlet as an externally accessible outlet of a [[Source]], [[Sink]], [[Flow]] or [[BidiFlow]].
*
* It is possible to call this method multiple times to get multiple [[Outlet]] instances if necessary. All of
* the outlets will emit the materialized value.
*
* Be careful to not to feed the result of this outlet to a stage that produces the materialized value itself (for
* example to a [[Sink#fold]] that contributes to the materialized value) since that might lead to an unresolvable
* dependency cycle.
*
* @return The outlet that will emit the materialized value.
*/
def matValue: Outlet[M] = {
val module = new MaterializedValueSource[Any]
moduleInProgress = moduleInProgress.grow(module)
module.shape.outlet.asInstanceOf[Outlet[M]]
}
private[stream] def andThen(port: OutPort, op: StageModule): Unit = { private[stream] def andThen(port: OutPort, op: StageModule): Unit = {
moduleInProgress = moduleInProgress =
moduleInProgress moduleInProgress
@ -228,7 +248,7 @@ object FlowGraph extends GraphApply {
"Cannot build the RunnableFlow because there are unconnected ports: " + "Cannot build the RunnableFlow because there are unconnected ports: " +
(moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", ")) (moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", "))
} }
new RunnableFlow(moduleInProgress) new RunnableFlow(moduleInProgress.wrap())
} }
private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = { private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = {
@ -239,7 +259,7 @@ object FlowGraph extends GraphApply {
s"Cannot build Source with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})") s"Cannot build Source with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})")
if (moduleInProgress.outPorts.head != outlet) if (moduleInProgress.outPorts.head != outlet)
throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the modules open Outlet ${moduleInProgress.outPorts.head}") throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the modules open Outlet ${moduleInProgress.outPorts.head}")
new Source(moduleInProgress.replaceShape(SourceShape(outlet))) new Source(moduleInProgress.replaceShape(SourceShape(outlet)).wrap())
} }
private[stream] def buildFlow[In, Out, Mat](inlet: Inlet[In], outlet: Outlet[Out]): Flow[In, Out, Mat] = { private[stream] def buildFlow[In, Out, Mat](inlet: Inlet[In], outlet: Outlet[Out]): Flow[In, Out, Mat] = {
@ -250,7 +270,7 @@ object FlowGraph extends GraphApply {
throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the modules open Outlet ${moduleInProgress.outPorts.head}") throw new IllegalArgumentException(s"provided Outlet $outlet does not equal the modules open Outlet ${moduleInProgress.outPorts.head}")
if (moduleInProgress.inPorts.head != inlet) if (moduleInProgress.inPorts.head != inlet)
throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the modules open Inlet ${moduleInProgress.inPorts.head}") throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the modules open Inlet ${moduleInProgress.inPorts.head}")
new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet))) new Flow(moduleInProgress.replaceShape(FlowShape(inlet, outlet)).wrap())
} }
private[stream] def buildBidiFlow[I1, O1, I2, O2, Mat](shape: BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = { private[stream] def buildBidiFlow[I1, O1, I2, O2, Mat](shape: BidiShape[I1, O1, I2, O2]): BidiFlow[I1, O1, I2, O2, Mat] = {
@ -261,7 +281,7 @@ object FlowGraph extends GraphApply {
throw new IllegalArgumentException(s"provided Outlets [${shape.outlets.mkString(",")}] does not equal the modules open Outlets [${moduleInProgress.outPorts.mkString(",")}]") throw new IllegalArgumentException(s"provided Outlets [${shape.outlets.mkString(",")}] does not equal the modules open Outlets [${moduleInProgress.outPorts.mkString(",")}]")
if (moduleInProgress.inPorts.toSet != shape.inlets.toSet) if (moduleInProgress.inPorts.toSet != shape.inlets.toSet)
throw new IllegalArgumentException(s"provided Inlets [${shape.inlets.mkString(",")}] does not equal the modules open Inlets [${moduleInProgress.inPorts.mkString(",")}]") throw new IllegalArgumentException(s"provided Inlets [${shape.inlets.mkString(",")}] does not equal the modules open Inlets [${moduleInProgress.inPorts.mkString(",")}]")
new BidiFlow(moduleInProgress.replaceShape(shape)) new BidiFlow(moduleInProgress.replaceShape(shape).wrap())
} }
private[stream] def buildSink[T, Mat](inlet: Inlet[T]): Sink[T, Mat] = { private[stream] def buildSink[T, Mat](inlet: Inlet[T]): Sink[T, Mat] = {
@ -272,7 +292,7 @@ object FlowGraph extends GraphApply {
s"Cannot build Sink with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})") s"Cannot build Sink with open inputs (${moduleInProgress.inPorts.mkString(",")}) and outputs (${moduleInProgress.outPorts.mkString(",")})")
if (moduleInProgress.inPorts.head != inlet) if (moduleInProgress.inPorts.head != inlet)
throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the modules open Inlet ${moduleInProgress.inPorts.head}") throw new IllegalArgumentException(s"provided Inlet $inlet does not equal the modules open Inlet ${moduleInProgress.inPorts.head}")
new Sink(moduleInProgress.replaceShape(SinkShape(inlet))) new Sink(moduleInProgress.replaceShape(SinkShape(inlet)).wrap())
} }
private[stream] def module: Module = moduleInProgress private[stream] def module: Module = moduleInProgress
@ -282,7 +302,7 @@ object FlowGraph extends GraphApply {
object Implicits { object Implicits {
@tailrec @tailrec
private[stream] def findOut[I, O](b: Builder, junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = { private[stream] def findOut[I, O](b: Builder[_], junction: UniformFanOutShape[I, O], n: Int): Outlet[O] = {
if (n == junction.outArray.length) if (n == junction.outArray.length)
throw new IllegalArgumentException(s"no more outlets free on $junction") throw new IllegalArgumentException(s"no more outlets free on $junction")
else if (b.module.downstreams.contains(junction.out(n))) findOut(b, junction, n + 1) else if (b.module.downstreams.contains(junction.out(n))) findOut(b, junction, n + 1)
@ -290,7 +310,7 @@ object FlowGraph extends GraphApply {
} }
@tailrec @tailrec
private[stream] def findIn[I, O](b: Builder, junction: UniformFanInShape[I, O], n: Int): Inlet[I] = { private[stream] def findIn[I, O](b: Builder[_], junction: UniformFanInShape[I, O], n: Int): Inlet[I] = {
if (n == junction.inArray.length) if (n == junction.inArray.length)
throw new IllegalArgumentException(s"no more inlets free on $junction") throw new IllegalArgumentException(s"no more inlets free on $junction")
else if (b.module.upstreams.contains(junction.in(n))) findIn(b, junction, n + 1) else if (b.module.upstreams.contains(junction.in(n))) findIn(b, junction, n + 1)
@ -298,19 +318,19 @@ object FlowGraph extends GraphApply {
} }
trait CombinerBase[T] extends Any { trait CombinerBase[T] extends Any {
def importAndGetPort(b: Builder): Outlet[T] def importAndGetPort(b: Builder[_]): Outlet[T]
def ~>(to: Inlet[T])(implicit b: Builder): Unit = { def ~>(to: Inlet[T])(implicit b: Builder[_]): Unit = {
b.addEdge(importAndGetPort(b), to) b.addEdge(importAndGetPort(b), to)
} }
def ~>[Out](via: Flow[T, Out, _])(implicit b: Builder): PortOps[Out, Unit] = { def ~>[Out](via: Flow[T, Out, _])(implicit b: Builder[_]): PortOps[Out, Unit] = {
val s = b.add(via) val s = b.add(via)
b.addEdge(importAndGetPort(b), s.inlet) b.addEdge(importAndGetPort(b), s.inlet)
s.outlet s.outlet
} }
def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder): PortOps[Out, Unit] = { def ~>[Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out, Unit] = {
def bind(n: Int): Unit = { def bind(n: Int): Unit = {
if (n == junction.inArray.length) if (n == junction.inArray.length)
throw new IllegalArgumentException(s"no more inlets free on $junction") throw new IllegalArgumentException(s"no more inlets free on $junction")
@ -321,7 +341,7 @@ object FlowGraph extends GraphApply {
junction.out junction.out
} }
def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder): PortOps[Out, Unit] = { def ~>[Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out, Unit] = {
b.addEdge(importAndGetPort(b), junction.in) b.addEdge(importAndGetPort(b), junction.in)
try findOut(b, junction, 0) try findOut(b, junction, 0)
catch { catch {
@ -329,34 +349,34 @@ object FlowGraph extends GraphApply {
} }
} }
def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder): PortOps[Out, Unit] = { def ~>[Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out, Unit] = {
b.addEdge(importAndGetPort(b), flow.inlet) b.addEdge(importAndGetPort(b), flow.inlet)
flow.outlet flow.outlet
} }
def ~>(to: Sink[T, _])(implicit b: Builder): Unit = { def ~>(to: Sink[T, _])(implicit b: Builder[_]): Unit = {
b.addEdge(importAndGetPort(b), b.add(to)) b.addEdge(importAndGetPort(b), b.add(to))
} }
def ~>(to: SinkShape[T])(implicit b: Builder): Unit = { def ~>(to: SinkShape[T])(implicit b: Builder[_]): Unit = {
b.addEdge(importAndGetPort(b), to.inlet) b.addEdge(importAndGetPort(b), to.inlet)
} }
} }
trait ReverseCombinerBase[T] extends Any { trait ReverseCombinerBase[T] extends Any {
def importAndGetPortReverse(b: Builder): Inlet[T] def importAndGetPortReverse(b: Builder[_]): Inlet[T]
def <~(from: Outlet[T])(implicit b: Builder): Unit = { def <~(from: Outlet[T])(implicit b: Builder[_]): Unit = {
b.addEdge(from, importAndGetPortReverse(b)) b.addEdge(from, importAndGetPortReverse(b))
} }
def <~[In](via: Flow[In, T, _])(implicit b: Builder): ReversePortOps[In] = { def <~[In](via: Flow[In, T, _])(implicit b: Builder[_]): ReversePortOps[In] = {
val s = b.add(via) val s = b.add(via)
b.addEdge(s.outlet, importAndGetPortReverse(b)) b.addEdge(s.outlet, importAndGetPortReverse(b))
s.inlet s.inlet
} }
def <~[In](junction: UniformFanOutShape[In, T])(implicit b: Builder): ReversePortOps[In] = { def <~[In](junction: UniformFanOutShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = {
def bind(n: Int): Unit = { def bind(n: Int): Unit = {
if (n == junction.outArray.length) if (n == junction.outArray.length)
throw new IllegalArgumentException(s"no more outlets free on $junction") throw new IllegalArgumentException(s"no more outlets free on $junction")
@ -367,7 +387,7 @@ object FlowGraph extends GraphApply {
junction.in junction.in
} }
def <~[In](junction: UniformFanInShape[In, T])(implicit b: Builder): ReversePortOps[In] = { def <~[In](junction: UniformFanInShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = {
b.addEdge(junction.out, importAndGetPortReverse(b)) b.addEdge(junction.out, importAndGetPortReverse(b))
try findIn(b, junction, 0) try findIn(b, junction, 0)
catch { catch {
@ -375,21 +395,21 @@ object FlowGraph extends GraphApply {
} }
} }
def <~[In](flow: FlowShape[In, T])(implicit b: Builder): ReversePortOps[In] = { def <~[In](flow: FlowShape[In, T])(implicit b: Builder[_]): ReversePortOps[In] = {
b.addEdge(flow.outlet, importAndGetPortReverse(b)) b.addEdge(flow.outlet, importAndGetPortReverse(b))
flow.inlet flow.inlet
} }
def <~(from: Source[T, _])(implicit b: Builder): Unit = { def <~(from: Source[T, _])(implicit b: Builder[_]): Unit = {
b.addEdge(b.add(from), importAndGetPortReverse(b)) b.addEdge(b.add(from), importAndGetPortReverse(b))
} }
def <~(from: SourceShape[T])(implicit b: Builder): Unit = { def <~(from: SourceShape[T])(implicit b: Builder[_]): Unit = {
b.addEdge(from.outlet, importAndGetPortReverse(b)) b.addEdge(from.outlet, importAndGetPortReverse(b))
} }
} }
class PortOps[Out, Mat](val outlet: Outlet[Out], b: Builder) extends FlowOps[Out, Mat] with CombinerBase[Out] { class PortOps[Out, Mat](val outlet: Outlet[Out], b: Builder[_]) extends FlowOps[Out, Mat] with CombinerBase[Out] {
override type Repr[+O, +M] = PortOps[O, M] @uncheckedVariance override type Repr[+O, +M] = PortOps[O, M] @uncheckedVariance
override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] =
@ -406,55 +426,55 @@ object FlowGraph extends GraphApply {
new PortOps(op.shape.outlet.asInstanceOf[Outlet[U]], b) new PortOps(op.shape.outlet.asInstanceOf[Outlet[U]], b)
} }
override def importAndGetPort(b: Builder): Outlet[Out] = outlet override def importAndGetPort(b: Builder[_]): Outlet[Out] = outlet
} }
class DisabledPortOps[Out, Mat](msg: String) extends PortOps[Out, Mat](null, null) { class DisabledPortOps[Out, Mat](msg: String) extends PortOps[Out, Mat](null, null) {
override def importAndGetPort(b: Builder): Outlet[Out] = throw new IllegalArgumentException(msg) override def importAndGetPort(b: Builder[_]): Outlet[Out] = throw new IllegalArgumentException(msg)
} }
implicit class ReversePortOps[In](val inlet: Inlet[In]) extends ReverseCombinerBase[In] { implicit class ReversePortOps[In](val inlet: Inlet[In]) extends ReverseCombinerBase[In] {
override def importAndGetPortReverse(b: Builder): Inlet[In] = inlet override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = inlet
} }
class DisabledReversePortOps[In](msg: String) extends ReversePortOps[In](null) { class DisabledReversePortOps[In](msg: String) extends ReversePortOps[In](null) {
override def importAndGetPortReverse(b: Builder): Inlet[In] = throw new IllegalArgumentException(msg) override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = throw new IllegalArgumentException(msg)
} }
implicit class FanInOps[In, Out](val j: UniformFanInShape[In, Out]) extends AnyVal with CombinerBase[Out] with ReverseCombinerBase[In] { implicit class FanInOps[In, Out](val j: UniformFanInShape[In, Out]) extends AnyVal with CombinerBase[Out] with ReverseCombinerBase[In] {
override def importAndGetPort(b: Builder): Outlet[Out] = j.out override def importAndGetPort(b: Builder[_]): Outlet[Out] = j.out
override def importAndGetPortReverse(b: Builder): Inlet[In] = findIn(b, j, 0) override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = findIn(b, j, 0)
} }
implicit class FanOutOps[In, Out](val j: UniformFanOutShape[In, Out]) extends AnyVal with ReverseCombinerBase[In] { implicit class FanOutOps[In, Out](val j: UniformFanOutShape[In, Out]) extends AnyVal with ReverseCombinerBase[In] {
override def importAndGetPortReverse(b: Builder): Inlet[In] = j.in override def importAndGetPortReverse(b: Builder[_]): Inlet[In] = j.in
} }
implicit class SinkArrow[T](val s: Sink[T, _]) extends AnyVal with ReverseCombinerBase[T] { implicit class SinkArrow[T](val s: Sink[T, _]) extends AnyVal with ReverseCombinerBase[T] {
override def importAndGetPortReverse(b: Builder): Inlet[T] = b.add(s) override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = b.add(s)
} }
implicit class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] { implicit class SinkShapeArrow[T](val s: SinkShape[T]) extends AnyVal with ReverseCombinerBase[T] {
override def importAndGetPortReverse(b: Builder): Inlet[T] = s.inlet override def importAndGetPortReverse(b: Builder[_]): Inlet[T] = s.inlet
} }
implicit class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] { implicit class FlowShapeArrow[I, O](val f: FlowShape[I, O]) extends AnyVal with ReverseCombinerBase[I] {
override def importAndGetPortReverse(b: Builder): Inlet[I] = f.inlet override def importAndGetPortReverse(b: Builder[_]): Inlet[I] = f.inlet
def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder): BidiShape[O, O2, I2, I] = { def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
val shape = b.add(bidi) val shape = b.add(bidi)
b.addEdge(f.outlet, shape.in1) b.addEdge(f.outlet, shape.in1)
b.addEdge(shape.out2, f.inlet) b.addEdge(shape.out2, f.inlet)
shape shape
} }
def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder): BidiShape[O, O2, I2, I] = { def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
b.addEdge(f.outlet, bidi.in1) b.addEdge(f.outlet, bidi.in1)
b.addEdge(bidi.out2, f.inlet) b.addEdge(bidi.out2, f.inlet)
bidi bidi
} }
def <~>[M](flow: Flow[O, I, M])(implicit b: Builder): Unit = { def <~>[M](flow: Flow[O, I, M])(implicit b: Builder[_]): Unit = {
val shape = b.add(flow) val shape = b.add(flow)
b.addEdge(shape.outlet, f.inlet) b.addEdge(shape.outlet, f.inlet)
b.addEdge(f.outlet, shape.inlet) b.addEdge(f.outlet, shape.inlet)
@ -462,7 +482,7 @@ object FlowGraph extends GraphApply {
} }
implicit class FlowArrow[I, O, M](val f: Flow[I, O, M]) extends AnyVal { implicit class FlowArrow[I, O, M](val f: Flow[I, O, M]) extends AnyVal {
def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder): BidiShape[O, O2, I2, I] = { def <~>[I2, O2, Mat](bidi: BidiFlow[O, O2, I2, I, Mat])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
val shape = b.add(bidi) val shape = b.add(bidi)
val flow = b.add(f) val flow = b.add(f)
b.addEdge(flow.outlet, shape.in1) b.addEdge(flow.outlet, shape.in1)
@ -470,14 +490,14 @@ object FlowGraph extends GraphApply {
shape shape
} }
def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder): BidiShape[O, O2, I2, I] = { def <~>[I2, O2](bidi: BidiShape[O, O2, I2, I])(implicit b: Builder[_]): BidiShape[O, O2, I2, I] = {
val flow = b.add(f) val flow = b.add(f)
b.addEdge(flow.outlet, bidi.in1) b.addEdge(flow.outlet, bidi.in1)
b.addEdge(bidi.out2, flow.inlet) b.addEdge(bidi.out2, flow.inlet)
bidi bidi
} }
def <~>[M2](flow: Flow[O, I, M2])(implicit b: Builder): Unit = { def <~>[M2](flow: Flow[O, I, M2])(implicit b: Builder[_]): Unit = {
val shape = b.add(flow) val shape = b.add(flow)
val ff = b.add(f) val ff = b.add(f)
b.addEdge(shape.outlet, ff.inlet) b.addEdge(shape.outlet, ff.inlet)
@ -486,25 +506,25 @@ object FlowGraph extends GraphApply {
} }
implicit class BidiFlowShapeArrow[I1, O1, I2, O2](val bidi: BidiShape[I1, O1, I2, O2]) extends AnyVal { implicit class BidiFlowShapeArrow[I1, O1, I2, O2](val bidi: BidiShape[I1, O1, I2, O2]) extends AnyVal {
def <~>[I3, O3](other: BidiShape[O1, O3, I3, I2])(implicit b: Builder): BidiShape[O1, O3, I3, I2] = { def <~>[I3, O3](other: BidiShape[O1, O3, I3, I2])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = {
b.addEdge(bidi.out1, other.in1) b.addEdge(bidi.out1, other.in1)
b.addEdge(other.out2, bidi.in2) b.addEdge(other.out2, bidi.in2)
other other
} }
def <~>[I3, O3, M](otherFlow: BidiFlow[O1, O3, I3, I2, M])(implicit b: Builder): BidiShape[O1, O3, I3, I2] = { def <~>[I3, O3, M](otherFlow: BidiFlow[O1, O3, I3, I2, M])(implicit b: Builder[_]): BidiShape[O1, O3, I3, I2] = {
val other = b.add(otherFlow) val other = b.add(otherFlow)
b.addEdge(bidi.out1, other.in1) b.addEdge(bidi.out1, other.in1)
b.addEdge(other.out2, bidi.in2) b.addEdge(other.out2, bidi.in2)
other other
} }
def <~>(flow: FlowShape[O1, I2])(implicit b: Builder): Unit = { def <~>(flow: FlowShape[O1, I2])(implicit b: Builder[_]): Unit = {
b.addEdge(bidi.out1, flow.inlet) b.addEdge(bidi.out1, flow.inlet)
b.addEdge(flow.outlet, bidi.in2) b.addEdge(flow.outlet, bidi.in2)
} }
def <~>[M](f: Flow[O1, I2, M])(implicit b: Builder): Unit = { def <~>[M](f: Flow[O1, I2, M])(implicit b: Builder[_]): Unit = {
val flow = b.add(f) val flow = b.add(f)
b.addEdge(bidi.out1, flow.inlet) b.addEdge(bidi.out1, flow.inlet)
b.addEdge(flow.outlet, bidi.in2) b.addEdge(flow.outlet, bidi.in2)
@ -513,21 +533,21 @@ object FlowGraph extends GraphApply {
import scala.language.implicitConversions import scala.language.implicitConversions
implicit def port2flow[T](from: Outlet[T])(implicit b: Builder): PortOps[T, Unit] = implicit def port2flow[T](from: Outlet[T])(implicit b: Builder[_]): PortOps[T, Unit] =
new PortOps(from, b) new PortOps(from, b)
implicit def fanOut2flow[I, O](j: UniformFanOutShape[I, O])(implicit b: Builder): PortOps[O, Unit] = implicit def fanOut2flow[I, O](j: UniformFanOutShape[I, O])(implicit b: Builder[_]): PortOps[O, Unit] =
new PortOps(findOut(b, j, 0), b) new PortOps(findOut(b, j, 0), b)
implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder): PortOps[O, Unit] = implicit def flow2flow[I, O](f: FlowShape[I, O])(implicit b: Builder[_]): PortOps[O, Unit] =
new PortOps(f.outlet, b) new PortOps(f.outlet, b)
implicit class SourceArrow[T](val s: Source[T, _]) extends AnyVal with CombinerBase[T] { implicit class SourceArrow[T](val s: Source[T, _]) extends AnyVal with CombinerBase[T] {
override def importAndGetPort(b: Builder): Outlet[T] = b.add(s) override def importAndGetPort(b: Builder[_]): Outlet[T] = b.add(s)
} }
implicit class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] { implicit class SourceShapeArrow[T](val s: SourceShape[T]) extends AnyVal with CombinerBase[T] {
override def importAndGetPort(b: Builder): Outlet[T] = s.outlet override def importAndGetPort(b: Builder[_]): Outlet[T] = s.outlet
} }
} }

View file

@ -23,8 +23,8 @@ private[akka] object JavaConverters {
implicit final class AddAsJavaSink[In, Mat](val sink: scaladsl.Sink[In, Mat]) extends AnyVal { implicit final class AddAsJavaSink[In, Mat](val sink: scaladsl.Sink[In, Mat]) extends AnyVal {
def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink) def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(sink)
} }
implicit final class AsAsJavaFlowGraphBuilder[Out](val builder: scaladsl.FlowGraph.Builder) extends AnyVal { implicit final class AsAsJavaFlowGraphBuilder[Out, Mat](val builder: scaladsl.FlowGraph.Builder[Mat]) extends AnyVal {
def asJava: javadsl.FlowGraph.Builder = new javadsl.FlowGraph.Builder()(builder) def asJava: javadsl.FlowGraph.Builder[Mat] = new javadsl.FlowGraph.Builder()(builder)
} }
implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal { implicit final class AddAsScalaSource[Out, Mat](val source: javadsl.Source[Out, Mat]) extends AnyVal {
@ -39,7 +39,7 @@ private[akka] object JavaConverters {
implicit final class AddAsScalaSink[In, Mat](val sink: javadsl.Sink[In, Mat]) extends AnyVal { implicit final class AddAsScalaSink[In, Mat](val sink: javadsl.Sink[In, Mat]) extends AnyVal {
def asScala: scaladsl.Sink[In, Mat] = sink.asScala def asScala: scaladsl.Sink[In, Mat] = sink.asScala
} }
implicit final class AsAsScalaFlowGraphBuilder[Out](val builder: javadsl.FlowGraph.Builder) extends AnyVal { implicit final class AsAsScalaFlowGraphBuilder[Out, Mat](val builder: javadsl.FlowGraph.Builder[Mat]) extends AnyVal {
def asScala: FlowGraph.Builder = builder.asScala def asScala: FlowGraph.Builder[Mat] = builder.asScala
} }
} }