diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst new file mode 100644 index 0000000000..a2ab83a929 --- /dev/null +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -0,0 +1,211 @@ +.. _stream-graph-java: + +################### +Working with Graphs +################### + +In Akka Streams computation graphs are not expressed using a fluent DSL like linear computations are, instead they are +written in a more graph-resembling DSL which aims to make translating graph drawings (e.g. from notes taken +from design discussions, or illustrations in protocol specifications) to and from code simpler. In this section we'll +dive into the multiple ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. + +Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations. +Considering linear Flows to be like roads, we can picture graph operations as junctions: multiple flows being connected at a single point. +Some graph operations which are common enough and fit the linear style of Flows, such as ``concat`` (which concatenates two +streams, such that the second one is consumed after the first one has completed), may have shorthand methods defined on +:class:`Flow` or :class:`Source` themselves, however you should keep in mind that those are also implemented as graph junctions. + +.. _flow-graph-java: + +Constructing Flow Graphs +------------------------ +Flow graphs are built from simple Flows which serve as the linear connections within the graphs as well as junctions +which serve as fan-in and fan-out points for Flows. Thanks to the junctions having meaningful types based on their behaviour +and making them explicit elements these elements should be rather straightforward to use. + +Akka Streams currently provide these junctions: + +* **Fan-out** + + - ``Broadcast`` – (1 input, n outputs) signals each output given an input signal, + - ``Balance`` – (1 input => n outputs), signals one of its output ports given an input signal, + - ``UnZip`` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``Pair`` into two streams one type ``A`` and one of type ``B``, + - ``FlexiRoute`` – (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, + +* **Fan-in** + + - ``Merge`` – (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, + - ``MergePreferred`` – like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, + - ``ZipWith`` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, + - ``Zip`` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into a ``Pair`` stream, + - ``Concat`` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, + - ``FlexiMerge`` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. + +One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is +simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating +the below hand drawn graph into Akka Streams: + +.. image:: ../images/simple-graph-example.png + +Such graph is simple to translate to the Graph DSL since each linear element corresponds to a :class:`Flow`, +and each circle corresponds to either a :class:`Junction` or a :class:`Source` or :class:`Sink` if it is beginning +or ending a :class:`Flow`. Those are connected with the ``addEdge`` method of the :class:`FlowGraphBuilder`. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowGraphDocTest.java#simple-flow-graph + +.. note:: + Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a FlowGraph + refers to the same location in the resulting graph). + + +By looking at the snippets above, it should be apparent that the :class:`FlowGraphBuilder` object is *mutable*. +The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles. +Once the FlowGraph has been constructed though, the :class:`FlowGraph` instance *is immutable, thread-safe, and freely shareable*. + +Linear Flows however are always immutable and appending an operation to a Flow always returns a new Flow instance. +This means that you can safely re-use one given Flow in multiple places in a processing graph. In the example below +we prepare a graph that consists of two parallel streams, in which we re-use the same instance of :class:`Flow`, +yet it will properly be materialized as two connections between the corresponding Sources and Sinks: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowGraphDocTest.java#flow-graph-reusing-a-flow + +.. _partial-flow-graph-java: + +Constructing and combining Partial Flow Graphs +---------------------------------------------- +Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct +all of its different phases in different places and in the end connect them all into a complete graph and run it. + +This can be achieved using :class:`PartialFlowGraph`. The reason of representing it as a different type is that a +:class:`FlowGraph` requires all ports to be connected, and if they are not it will throw an exception at construction +time, which helps to avoid simple wiring errors while working with graphs. A partial flow graph however does not perform +this validation, and allows graphs that are not yet fully connected. + +A :class:`PartialFlowGraph` is defined as a :class:`FlowGraph` which contains so called "undefined elements", +such as ``UndefinedSink`` or ``UndefinedSource``, which can be reused and plugged into by consumers of that +partial flow graph. Let's imagine we want to provide users with a specialized element that given 3 inputs will pick +the greatest int value of each zipped triple. We'll want to expose 3 input ports (undefined sources) and one output port +(undefined sink). + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#simple-partial-flow-graph + +As you can see, first we construct the partial graph that contains all the zipping and comparing of stream +elements, then we import it (all of its nodes and connections) explicitly to the :class:`FlowGraph` instance in which all +the undefined elements are rewired to real sources and sinks. The graph can then be run and yields the expected result. + +.. warning:: + Please note that a :class:`FlowGraph` is not able to provide compile time type-safety about whether or not all + elements have been properly connected - this validation is performed as a runtime check during the graph's instantiation. + +.. _constructing-sources-sinks-flows-from-partial-graphs-java: + +Constructing Sources, Sinks and Flows from Partial Graphs +--------------------------------------------------------- +Instead of treating a :class:`PartialFlowGraph` as simply a collection of flows and junctions which may not yet all be +connected it is sometimes useful to expose such a complex graph as a simpler structure, +such as a :class:`Source`, :class:`Sink` or :class:`Flow`. + +In fact, these concepts can be easily expressed as special cases of a partially connected graph: + +* :class:`Source` is a partial flow graph with *exactly one* :class:`UndefinedSink`, +* :class:`Sink` is a partial flow graph with *exactly one* :class:`UndefinedSource`, +* :class:`Flow` is a partial flow graph with *exactly one* :class:`UndefinedSource` and *exactly one* :class:`UndefinedSource`. + +Being able to hide complex graphs inside of simple elements such as Sink / Source / Flow enables you to easily create one +complex element and from there on treat it as simple compound stage for linear computations. + +In order to create a Source from a partial flow graph ``Source`` provides a special apply method that takes a function +that must return an ``UndefinedSink``. This undefined sink will become "the sink that must be attached before this Source +can run". Refer to the example below, in which we create a Source that zips together two numbers, to see this graph +construction in action: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#source-from-partial-flow-graph + +Similarly the same can be done for a ``Sink``, in which case the returned value must be an ``UndefinedSource``. +For defining a ``Flow`` we need to expose both an undefined source and sink: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#flow-from-partial-flow-graph + +.. _graph-cycles-java: + +Graph cycles, liveness and deadlocks +------------------------------------ + +By default :class:`FlowGraph` does not allow (or to be precise, its builder does not allow) the creation of cycles. +The reason for this is that cycles need special considerations to avoid potential deadlocks and other liveness issues. +This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing +graphs. + +The first example demonstrates a graph that contains a naive cycle (the presence of cycles is enabled by calling +``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements +to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via +a ``Merge`` junction. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphCyclesDocTest.java#deadlocked + +Running this we observe that after a few numbers have been printed, no more elements are logged to the console - +all processing stops after some time. After some investigation we observe that: + +* through merging from ``source`` we increase the number of elements flowing in the cycle +* by broadcasting back to the cycle we do not decrease the number of elements in the cycle + +Since Akka Streams (and Reactive Streams in general) guarantee bounded processing (see the "Buffering" section for more +details) it means that only a bounded number of elements are buffered over any time span. Since our cycle gains more and +more elements, eventually all of its internal buffers become full, backpressuring ``source`` forever. To be able +to process more elements from ``source`` elements would need to leave the cycle somehow. + +If we modify our feedback loop by replacing the ``Merge`` junction with a ``MergePreferred`` we can avoid the deadlock. +``MergePreferred`` is unfair as it always tries to consume from a preferred input port if there are elements available +before trying the other lower priority input ports. Since we feed back through the preferred port it is always guaranteed +that the elements in the cycles can flow. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphCyclesDocTest.java#unfair + +If we run the example we see that the same sequence of numbers are printed +over and over again, but the processing does not stop. Hence, we avoided the deadlock, but ``source`` is still +back-pressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple +of initial elements from ``source``. + +.. note:: + What we see here is that in certain cases we need to choose between boundedness and liveness. Our first example would + not deadlock if there would be an infinite buffer in the loop, or vice versa, if the elements in the cycle would + be balanced (as many elements are removed as many are injected) then there would be no deadlock. + +To make our cycle both live (not deadlocking) and fair we can introduce a dropping element on the feedback arc. In this +case we chose the ``buffer()`` operation giving it a dropping strategy ``OverflowStrategy.dropHead``. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphCyclesDocTest.java#dropping + +If we run this example we see that + +* The flow of elements does not stop, there are always elements printed +* We see that some of the numbers are printed several times over time (due to the feedback loop) but on average + the numbers are increasing in the long term + +This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles +(cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to +define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after +all buffer space has been consumed. + +As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We +circumvented this issue by adding a dropping element, but now we want to build a cycle that is balanced from +the beginning instead. To achieve this we modify our first graph by replacing the ``Merge`` junction with a ``ZipWith``. +Since ``ZipWith`` takes one element from ``source`` *and* from the feedback arc to inject one element into the cycle, +we maintain the balance of elements. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphCyclesDocTest.java#zipping-dead + +Still, when we try to run the example it turns out that no element is printed at all! After some investigation we +realize that: + +* In order to get the first element from ``source`` into the cycle we need an already existing element in the cycle +* In order to get an initial element in the cycle we need an element from ``source`` + +These two conditions are a typical "chicken-and-egg" problem. The solution is to inject an initial +element into the cycle that is independent from ``source``. We do this by using a ``Concat`` junction on the backwards +arc that injects a single element using ``Source.single``. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphCyclesDocTest.java#zipping-live + +When we run the above example we see that processing starts and never stops. The important takeaway from this example +is that balanced cycles often need an initial "kick-off" element to be injected into the cycle. diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index 6b5a218222..4d5194e838 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -105,8 +105,8 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { } val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head) - Await.result(firstPair, 300.millis) should equal(1 → 2) //#source-from-partial-flow-graph + Await.result(firstPair, 300.millis) should equal(1 -> 2) } "build flow from partial flow graph" in { @@ -140,6 +140,6 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { //#flow-from-partial-flow-graph // format: ON - Await.result(matSink, 300.millis) should equal(1 → "1") + Await.result(matSink, 300.millis) should equal(1 -> "1") } } diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index 78fb5989a1..bb203b86ec 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -50,7 +50,7 @@ the below hand drawn graph into Akka Streams: Such graph is simple to translate to the Graph DSL since each linear element corresponds to a :class:`Flow`, and each circle corresponds to either a :class:`Junction` or a :class:`Source` or :class:`Sink` if it is beginning or ending a :class:`Flow`. Junctions must always be created with defined type parameters, as otherwise the ``Nothing`` type -will be inferred and +will be inferred. .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index 375240184c..4bd8e3c990 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -22,6 +22,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { ("apply" → "create") :: ("apply" → "of") :: ("apply" → "from") :: + ("apply" -> "fromGraph") :: Nil // format: OFF @@ -88,8 +89,18 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { // here be dragons... - private def getJMethods(jClass: Class[_]): Array[Method] = jClass.getDeclaredMethods.filterNot(javaIgnore contains _.getName) - private def getSMethods(sClass: Class[_]): Array[Method] = sClass.getMethods.filterNot(scalaIgnore contains _.getName) + private def getJMethods(jClass: Class[_]): Array[Method] = jClass.getDeclaredMethods.filterNot(javaIgnore contains _.getName).filter(include) + private def getSMethods(sClass: Class[_]): Array[Method] = sClass.getMethods.filterNot(scalaIgnore contains _.getName).filter(include) + + private def include(m: Method): Boolean = { + if (m.getDeclaringClass == akka.stream.scaladsl.Source.getClass + && m.getName == "apply" + && m.getParameterTypes.length == 1 + && m.getParameterTypes()(0) == classOf[scala.Function1[akka.stream.scaladsl.FlowGraphBuilder, akka.stream.scaladsl.UndefinedSink[_]]]) + false // conflict between two Source.apply(Function1) + else + true + } def runSpec(sClass: Class[_], jClass: Class[_]) { val jMethods = getJMethods(jClass) @@ -191,4 +202,4 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { private def provide = afterWord("provide") -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala index c3b2c0a260..0bf1332d92 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala @@ -99,6 +99,10 @@ object MergePreferred { */ def create[T](clazz: Class[T], attributes: OperationAttributes): MergePreferred[T] = create(attributes) + + class Preferred[T] private[akka] (delegate: scaladsl.MergePreferred.Preferred[T]) extends JunctionInPort[T] { + override def asScala: scaladsl.JunctionInPort[T] = delegate + } } /** @@ -115,6 +119,8 @@ object MergePreferred { */ class MergePreferred[T](delegate: scaladsl.MergePreferred[T]) extends javadsl.Junction[T] { override def asScala: scaladsl.MergePreferred[T] = delegate + + val preferred = new MergePreferred.Preferred[T](delegate.preferred) } object Broadcast { @@ -409,13 +415,21 @@ final class UndefinedSink[-T](delegate: scaladsl.UndefinedSink[T]) { object FlowGraph { /** - * Start building a [[FlowGraph]]. + * Start building a [[FlowGraph]] or [[PartialFlowGraph]]. * * The [[FlowGraphBuilder]] is mutable and not thread-safe, * thus you should construct your Graph and then share the constructed immutable [[FlowGraph]]. */ def builder(): FlowGraphBuilder = new FlowGraphBuilder() + /** + * Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`. + * For example you can attach undefined sources and sinks with + * [[FlowGraphBuilder#attachSource]] and [[FlowGraphBuilder#attachSink]] + */ + def builder(partialFlowGraph: PartialFlowGraph): FlowGraphBuilder = + new FlowGraphBuilder(partialFlowGraph) + } /** @@ -424,6 +438,15 @@ object FlowGraph { */ class FlowGraphBuilder(b: scaladsl.FlowGraphBuilder) { + /** + * Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`. + * For example you can attach undefined sources and sinks with + * [[#attachSource]] and [[#attachSink]] + */ + def this(partialFlowGraph: PartialFlowGraph) { + this(new scaladsl.FlowGraphBuilder(partialFlowGraph.asScala)) + } + def this() { this(new scaladsl.FlowGraphBuilder()) } @@ -431,61 +454,78 @@ class FlowGraphBuilder(b: scaladsl.FlowGraphBuilder) { /** Converts this Java DSL element to it's Scala DSL counterpart. */ def asScala: scaladsl.FlowGraphBuilder = b - def addEdge[In, Out](source: javadsl.UndefinedSource[In], flow: javadsl.Flow[In, Out], junctionIn: javadsl.JunctionInPort[Out]) = { + def addEdge[In, Out](source: javadsl.UndefinedSource[In], flow: javadsl.Flow[In, Out], junctionIn: javadsl.JunctionInPort[Out]): FlowGraphBuilder = { b.addEdge(source.asScala, flow.asScala, junctionIn.asScala) this } + def addEdge[T](source: javadsl.UndefinedSource[T], junctionIn: javadsl.JunctionInPort[T]) = + addEdge[T, T](source, javadsl.Flow.empty[T], junctionIn); + def addEdge[In, Out](junctionOut: javadsl.JunctionOutPort[In], flow: javadsl.Flow[In, Out], sink: javadsl.UndefinedSink[Out]): FlowGraphBuilder = { b.addEdge(junctionOut.asScala, flow.asScala, sink.asScala) this } + def addEdge[T](junctionOut: javadsl.JunctionOutPort[T], sink: javadsl.UndefinedSink[T]): FlowGraphBuilder = + addEdge[T, T](junctionOut, javadsl.Flow.empty[T], sink); + def addEdge[In, Out](junctionOut: javadsl.JunctionOutPort[In], flow: javadsl.Flow[In, Out], junctionIn: javadsl.JunctionInPort[Out]): FlowGraphBuilder = { b.addEdge(junctionOut.asScala, flow.asScala, junctionIn.asScala) this } + def addEdge[T](junctionOut: javadsl.JunctionOutPort[T], junctionIn: javadsl.JunctionInPort[T]): FlowGraphBuilder = + addEdge[T, T](junctionOut, javadsl.Flow.empty[T], junctionIn); + def addEdge[In, Out](source: javadsl.Source[In], flow: javadsl.Flow[In, Out], junctionIn: javadsl.JunctionInPort[Out]): FlowGraphBuilder = { b.addEdge(source.asScala, flow.asScala, junctionIn.asScala) this } - def addEdge[In](source: javadsl.Source[In], junctionIn: javadsl.JunctionInPort[In]): FlowGraphBuilder = { - b.addEdge(source.asScala, junctionIn.asScala) - this - } - - def addEdge[In, Out](junctionOut: javadsl.JunctionOutPort[In], sink: Sink[In]): FlowGraphBuilder = { - b.addEdge(junctionOut.asScala, sink.asScala) - this - } + def addEdge[T](source: javadsl.Source[T], junctionIn: javadsl.JunctionInPort[T]): FlowGraphBuilder = + addEdge[T, T](source, javadsl.Flow.empty[T], junctionIn); def addEdge[In, Out](junctionOut: javadsl.JunctionOutPort[In], flow: javadsl.Flow[In, Out], sink: Sink[Out]): FlowGraphBuilder = { b.addEdge(junctionOut.asScala, flow.asScala, sink.asScala) this } + def addEdge[T](junctionOut: javadsl.JunctionOutPort[T], sink: Sink[T]): FlowGraphBuilder = + addEdge[T, T](junctionOut, javadsl.Flow.empty[T], sink); + def addEdge[In, Out](source: javadsl.Source[In], flow: javadsl.Flow[In, Out], sink: Sink[Out]): FlowGraphBuilder = { b.addEdge(source.asScala, flow.asScala, sink.asScala) this } + def addEdge[T](source: javadsl.Source[T], sink: Sink[T]): FlowGraphBuilder = + addEdge[T, T](source, javadsl.Flow.empty[T], sink); + def addEdge[In, Out](source: javadsl.UndefinedSource[In], flow: javadsl.Flow[In, Out], sink: javadsl.UndefinedSink[Out]): FlowGraphBuilder = { b.addEdge(source.asScala, flow.asScala, sink.asScala) this } + def addEdge[T](source: javadsl.UndefinedSource[T], sink: javadsl.UndefinedSink[T]): FlowGraphBuilder = + addEdge[T, T](source, javadsl.Flow.empty[T], sink); + def addEdge[In, Out](source: javadsl.UndefinedSource[In], flow: javadsl.Flow[In, Out], sink: javadsl.Sink[Out]): FlowGraphBuilder = { b.addEdge(source.asScala, flow.asScala, sink.asScala) this } + def addEdge[T](source: javadsl.UndefinedSource[T], sink: javadsl.Sink[T]): FlowGraphBuilder = + addEdge[T, T](source, javadsl.Flow.empty[T], sink); + def addEdge[In, Out](source: javadsl.Source[In], flow: javadsl.Flow[In, Out], sink: javadsl.UndefinedSink[Out]): FlowGraphBuilder = { b.addEdge(source.asScala, flow.asScala, sink.asScala) this } + def addEdge[T](source: javadsl.Source[T], sink: javadsl.UndefinedSink[T]): FlowGraphBuilder = + addEdge[T, T](source, javadsl.Flow.empty[T], sink); + def attachSink[Out](token: javadsl.UndefinedSink[Out], sink: Sink[Out]): FlowGraphBuilder = { b.attachSink(token.asScala, sink.asScala) this @@ -511,8 +551,8 @@ class FlowGraphBuilder(b: scaladsl.FlowGraphBuilder) { * After importing you can [[#connect]] undefined sources and sinks in * two different `PartialFlowGraph` instances. */ - def importPartialFlowGraph(partialFlowGraph: scaladsl.PartialFlowGraph): FlowGraphBuilder = { - b.importPartialFlowGraph(partialFlowGraph) + def importPartialFlowGraph(partialFlowGraph: javadsl.PartialFlowGraph): FlowGraphBuilder = { + b.importPartialFlowGraph(partialFlowGraph.asScala) this } @@ -541,8 +581,6 @@ class FlowGraphBuilder(b: scaladsl.FlowGraphBuilder) { } -object PartialFlowGraphBuilder extends FlowGraphBuilder - class PartialFlowGraph(delegate: scaladsl.PartialFlowGraph) { import akka.stream.scaladsl.JavaConverters._ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 6c5751ccb3..3bba5ef823 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -108,9 +108,16 @@ object Source { * Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects * a [[FlowGraphBuilder]] and returns the `UndefinedSink`. */ - def from[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSink[T]]): Source[T] = + def fromGraph[T](graph: PartialFlowGraph, block: japi.Function[FlowGraphBuilder, UndefinedSink[T]]): Source[T] = new Source(scaladsl.Source(graph.asScala)(x ⇒ block.apply(x.asJava).asScala)) + /** + * Creates a `Source` by using a [[FlowGraphBuilder]] from on a block that expects + * a [[FlowGraphBuilder]] and returns the `UndefinedSink`. + */ + def fromGraph[T](block: japi.Function[FlowGraphBuilder, UndefinedSink[T]]): Source[T] = + new Source(scaladsl.Source()(x ⇒ block.apply(x.asJava).asScala)) + /** * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should