Rename RunnableFlow to RunnableGraph

This commit is contained in:
Endre Sándor Varga 2015-06-23 18:41:55 +02:00
parent 7879a5521b
commit c7a974dd1e
23 changed files with 102 additions and 102 deletions

View file

@ -45,14 +45,14 @@ Sink
Flow Flow
A processing stage which has *exactly one input and output*, which connects its up- and downstreams by A processing stage which has *exactly one input and output*, which connects its up- and downstreams by
transforming the data elements flowing through it. transforming the data elements flowing through it.
RunnableFlow RunnableGraph
A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``.
It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend
a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink, a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink,
it will be represented by the ``RunnableFlow`` type, indicating that it is ready to be executed. it will be represented by the ``RunnableGraph`` type, indicating that it is ready to be executed.
It is important to remember that even after constructing the ``RunnableFlow`` by connecting all the source, sink and It is important to remember that even after constructing the ``RunnableGraph`` by connecting all the source, sink and
different processing stages, no data will flow through it until it is materialized. Materialization is the process of different processing stages, no data will flow through it until it is materialized. Materialization is the process of
allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve
starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable,
@ -61,7 +61,7 @@ one actor prepare the work, and then have it be materialized at some completely
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#materialization-in-steps .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#materialization-in-steps
After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both After running (materializing) the ``RunnableGraph`` we get a special container object, the ``MaterializedMap``. Both
sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation
dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result
of the folding process over the stream. In general, a stream can expose multiple materialized values, of the folding process over the stream. In general, a stream can expose multiple materialized values,

View file

@ -60,7 +60,7 @@ or ending a :class:`Flow`.
By looking at the snippets above, it should be apparent that the ``builder`` object is *mutable*. By looking at the snippets above, it should be apparent that the ``builder`` object is *mutable*.
The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles. The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles.
Once the FlowGraph has been constructed though, the :class:`RunnableFlow` instance *is immutable, thread-safe, and freely shareable*. Once the FlowGraph has been constructed though, the :class:`RunnableGraph` instance *is immutable, thread-safe, and freely shareable*.
The same is true of all flow pieces—sources, sinks, and flows—once they are constructed. The same is true of all flow pieces—sources, sinks, and flows—once they are constructed.
This means that you can safely re-use one given Flow in multiple places in a processing graph. This means that you can safely re-use one given Flow in multiple places in a processing graph.
@ -88,8 +88,8 @@ all of its different phases in different places and in the end connect them all
This can be achieved using ``FlowGraph.factory().partial()`` instead of This can be achieved using ``FlowGraph.factory().partial()`` instead of
``FlowGraph.factory().closed()``, which will return a ``Graph`` instead of a ``FlowGraph.factory().closed()``, which will return a ``Graph`` instead of a
``RunnableFlow``. The reason of representing it as a different type is that a ``RunnableGraph``. The reason of representing it as a different type is that a
:class:`RunnableFlow` requires all ports to be connected, and if they are not :class:`RunnableGraph` requires all ports to be connected, and if they are not
it will throw an exception at construction time, which helps to avoid simple it will throw an exception at construction time, which helps to avoid simple
wiring errors while working with graphs. A partial flow graph however allows wiring errors while working with graphs. A partial flow graph however allows
you to return the set of yet to be connected ports from the code block that you to return the set of yet to be connected ports from the code block that

View file

@ -146,8 +146,8 @@ First, we prepare the :class:`FoldSink` which will be used to sum all ``Integer`
Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``,
finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` be ``run()``, as indicated by its type: :class:`RunnableGraph`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph`` or ``FlowGraph`` is ``MaterializedMap``,
which can be used to retrieve materialized values from the running stream. which can be used to retrieve materialized values from the running stream.
In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map
@ -155,7 +155,7 @@ obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Mat
as ``Future<Integer>`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. as ``Future<Integer>`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream.
In case of the stream failing, this future would complete with a Failure. In case of the stream failing, this future would complete with a Failure.
The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableGraph` may be reused
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
will be different, as illustrated by this example: will be different, as illustrated by this example:

View file

@ -35,8 +35,8 @@ class FlowDocSpec extends AkkaSpec {
val source = Source(1 to 10) val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _) val sink = Sink.fold[Int, Int](0)(_ + _)
// connect the Source to the Sink, obtaining a RunnableFlow // connect the Source to the Sink, obtaining a RunnableGraph
val runnable: RunnableFlow[Future[Int]] = source.toMat(sink)(Keep.right) val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
// materialize the flow and get the value of the FoldSink // materialize the flow and get the value of the FoldSink
val sum: Future[Int] = runnable.run() val sum: Future[Int] = runnable.run()
@ -56,9 +56,9 @@ class FlowDocSpec extends AkkaSpec {
"materialization is unique" in { "materialization is unique" in {
//#stream-reuse //#stream-reuse
// connect the Source to the Sink, obtaining a RunnableFlow // connect the Source to the Sink, obtaining a RunnableGraph
val sink = Sink.fold[Int, Int](0)(_ + _) val sink = Sink.fold[Int, Int](0)(_ + _)
val runnable: RunnableFlow[Future[Int]] = val runnable: RunnableGraph[Future[Int]] =
Source(1 to 10).toMat(sink)(Keep.right) Source(1 to 10).toMat(sink)(Keep.right)
// get the materialized value of the FoldSink // get the materialized value of the FoldSink
@ -162,11 +162,11 @@ class FlowDocSpec extends AkkaSpec {
val sink: Sink[Int, Future[Int]] = Sink.head[Int] val sink: Sink[Int, Future[Int]] = Sink.head[Int]
// By default, the materialized value of the leftmost stage is preserved // By default, the materialized value of the leftmost stage is preserved
val r1: RunnableFlow[Promise[Unit]] = source.via(flow).to(sink) val r1: RunnableGraph[Promise[Unit]] = source.via(flow).to(sink)
// Simple selection of materialized values by using Keep.right // Simple selection of materialized values by using Keep.right
val r2: RunnableFlow[Cancellable] = source.viaMat(flow)(Keep.right).to(sink) val r2: RunnableGraph[Cancellable] = source.viaMat(flow)(Keep.right).to(sink)
val r3: RunnableFlow[Future[Int]] = source.via(flow).toMat(sink)(Keep.right) val r3: RunnableGraph[Future[Int]] = source.via(flow).toMat(sink)(Keep.right)
// Using runWith will always give the materialized values of the stages added // Using runWith will always give the materialized values of the stages added
// by runWith() itself // by runWith() itself
@ -175,21 +175,21 @@ class FlowDocSpec extends AkkaSpec {
val r6: (Promise[Unit], Future[Int]) = flow.runWith(source, sink) val r6: (Promise[Unit], Future[Int]) = flow.runWith(source, sink)
// Using more complext combinations // Using more complext combinations
val r7: RunnableFlow[(Promise[Unit], Cancellable)] = val r7: RunnableGraph[(Promise[Unit], Cancellable)] =
source.viaMat(flow)(Keep.both).to(sink) source.viaMat(flow)(Keep.both).to(sink)
val r8: RunnableFlow[(Promise[Unit], Future[Int])] = val r8: RunnableGraph[(Promise[Unit], Future[Int])] =
source.via(flow).toMat(sink)(Keep.both) source.via(flow).toMat(sink)(Keep.both)
val r9: RunnableFlow[((Promise[Unit], Cancellable), Future[Int])] = val r9: RunnableGraph[((Promise[Unit], Cancellable), Future[Int])] =
source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both) source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both)
val r10: RunnableFlow[(Cancellable, Future[Int])] = val r10: RunnableGraph[(Cancellable, Future[Int])] =
source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both) source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both)
// It is also possible to map over the materialized values. In r9 we had a // It is also possible to map over the materialized values. In r9 we had a
// doubly nested pair, but we want to flatten it out // doubly nested pair, but we want to flatten it out
val r11: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] = val r11: RunnableGraph[(Promise[Unit], Cancellable, Future[Int])] =
r9.mapMaterializedValue { r9.mapMaterializedValue {
case ((promise, cancellable), future) => case ((promise, cancellable), future) =>
(promise, cancellable, future) (promise, cancellable, future)
@ -204,7 +204,7 @@ class FlowDocSpec extends AkkaSpec {
future.map(_ + 3) future.map(_ + 3)
// The result of r11 can be also achieved by using the Graph API // The result of r11 can be also achieved by using the Graph API
val r12: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] = val r12: RunnableGraph[(Promise[Unit], Cancellable, Future[Int])] =
FlowGraph.closed(source, flow, sink)((_, _, _)) { implicit builder => FlowGraph.closed(source, flow, sink)((_, _, _)) { implicit builder =>
(src, f, dst) => (src, f, dst) =>
import FlowGraph.Implicits._ import FlowGraph.Implicits._

View file

@ -1,7 +1,7 @@
package docs.stream package docs.stream
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ RunnableFlow, Sink, Source, Flow, Keep } import akka.stream.scaladsl.{ RunnableGraph, Sink, Source, Flow, Keep }
import akka.stream.stage.PushPullStage import akka.stream.stage.PushPullStage
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import org.scalatest.concurrent.{ ScalaFutures, Futures } import org.scalatest.concurrent.{ ScalaFutures, Futures }

View file

@ -146,7 +146,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#email-addresses-mapAsync //#email-addresses-mapAsync
//#send-emails //#send-emails
val sendEmails: RunnableFlow[Unit] = val sendEmails: RunnableGraph[Unit] =
emailAddresses emailAddresses
.mapAsync(4)(address => { .mapAsync(4)(address => {
emailServer.send( emailServer.send(
@ -196,7 +196,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
.mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle)) .mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress } .collect { case Some(emailAddress) => emailAddress }
val sendEmails: RunnableFlow[Unit] = val sendEmails: RunnableGraph[Unit] =
emailAddresses emailAddresses
.mapAsyncUnordered(4)(address => { .mapAsyncUnordered(4)(address => {
emailServer.send( emailServer.send(
@ -231,7 +231,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#blocking-mapAsync //#blocking-mapAsync
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher") val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val sendTextMessages: RunnableFlow[Unit] = val sendTextMessages: RunnableGraph[Unit] =
phoneNumbers phoneNumbers
.mapAsync(4)(phoneNo => { .mapAsync(4)(phoneNo => {
Future { Future {
@ -271,7 +271,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet")) smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet"))
} }
.withAttributes(ActorAttributes.dispatcher("blocking-dispatcher")) .withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))
val sendTextMessages: RunnableFlow[Unit] = val sendTextMessages: RunnableGraph[Unit] =
phoneNumbers.via(send).to(Sink.ignore) phoneNumbers.via(send).to(Sink.ignore)
sendTextMessages.run() sendTextMessages.run()
@ -294,7 +294,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val akkaTweets: Source[Tweet, Unit] = tweets.filter(_.hashtags.contains(akka)) val akkaTweets: Source[Tweet, Unit] = tweets.filter(_.hashtags.contains(akka))
implicit val timeout = Timeout(3.seconds) implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableFlow[Unit] = val saveTweets: RunnableGraph[Unit] =
akkaTweets akkaTweets
.mapAsync(4)(tweet => database ? Save(tweet)) .mapAsync(4)(tweet => database ? Save(tweet))
.to(Sink.ignore) .to(Sink.ignore)

View file

@ -157,7 +157,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#tweets-fold-count //#tweets-fold-count
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counter: RunnableFlow[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right) val counter: RunnableGraph[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right)
val sum: Future[Int] = counter.run() val sum: Future[Int] = counter.run()
@ -176,20 +176,20 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#tweets-runnable-flow-materialized-twice //#tweets-runnable-flow-materialized-twice
val sumSink = Sink.fold[Int, Int](0)(_ + _) val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableFlow: RunnableFlow[Future[Int]] = val counterRunnableGraph: RunnableGraph[Future[Int]] =
tweetsInMinuteFromNow tweetsInMinuteFromNow
.filter(_.hashtags contains akka) .filter(_.hashtags contains akka)
.map(t => 1) .map(t => 1)
.toMat(sumSink)(Keep.right) .toMat(sumSink)(Keep.right)
// materialize the stream once in the morning // materialize the stream once in the morning
val morningTweetsCount: Future[Int] = counterRunnableFlow.run() val morningTweetsCount: Future[Int] = counterRunnableGraph.run()
// and once in the evening, reusing the flow // and once in the evening, reusing the flow
val eveningTweetsCount: Future[Int] = counterRunnableFlow.run() val eveningTweetsCount: Future[Int] = counterRunnableGraph.run()
//#tweets-runnable-flow-materialized-twice //#tweets-runnable-flow-materialized-twice
val sum: Future[Int] = counterRunnableFlow.run() val sum: Future[Int] = counterRunnableGraph.run()
sum.map { c => println(s"Total tweets processed: $c") } sum.map { c => println(s"Total tweets processed: $c") }
} }

View file

@ -45,14 +45,14 @@ Sink
Flow Flow
A processing stage which has *exactly one input and output*, which connects its up- and downstreams by A processing stage which has *exactly one input and output*, which connects its up- and downstreams by
transforming the data elements flowing through it. transforming the data elements flowing through it.
RunnableFlow RunnableGraph
A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``.
It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend
a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink, a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink,
it will be represented by the ``RunnableFlow`` type, indicating that it is ready to be executed. it will be represented by the ``RunnableGraph`` type, indicating that it is ready to be executed.
It is important to remember that even after constructing the ``RunnableFlow`` by connecting all the source, sink and It is important to remember that even after constructing the ``RunnableGraph`` by connecting all the source, sink and
different processing stages, no data will flow through it until it is materialized. Materialization is the process of different processing stages, no data will flow through it until it is materialized. Materialization is the process of
allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve
starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable,
@ -61,7 +61,7 @@ one actor prepare the work, and then have it be materialized at some completely
.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps .. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps
After running (materializing) the ``RunnableFlow[T]`` we get back the materialized value of type T. Every stream processing After running (materializing) the ``RunnableGraph[T]`` we get back the materialized value of type T. Every stream processing
stage can produce a materialized value, and it is the responsibility of the user to combine them to a new type. stage can produce a materialized value, and it is the responsibility of the user to combine them to a new type.
In the above example we used ``toMat`` to indicate that we want to transform the materialized value of the source and In the above example we used ``toMat`` to indicate that we want to transform the materialized value of the source and
sink, and we used the convenience function ``Keep.right`` to say that we are only interested in the materialized value sink, and we used the convenience function ``Keep.right`` to say that we are only interested in the materialized value

View file

@ -96,8 +96,8 @@ all of its different phases in different places and in the end connect them all
This can be achieved using ``FlowGraph.partial`` instead of This can be achieved using ``FlowGraph.partial`` instead of
``FlowGraph.closed``, which will return a ``Graph`` instead of a ``FlowGraph.closed``, which will return a ``Graph`` instead of a
``RunnableFlow``. The reason of representing it as a different type is that a ``RunnableGraph``. The reason of representing it as a different type is that a
:class:`RunnableFlow` requires all ports to be connected, and if they are not :class:`RunnableGraph` requires all ports to be connected, and if they are not
it will throw an exception at construction time, which helps to avoid simple it will throw an exception at construction time, which helps to avoid simple
wiring errors while working with graphs. A partial flow graph however allows wiring errors while working with graphs. A partial flow graph however allows
you to return the set of yet to be connected ports from the code block that you to return the set of yet to be connected ports from the code block that

View file

@ -147,16 +147,16 @@ finally we connect the flow using ``toMat`` the previously prepared Sink. Rememb
materialized. When you chain these together, you can explicitly combine their materialized values: in our example we materialized. When you chain these together, you can explicitly combine their materialized values: in our example we
used the ``Keep.right`` predefined function, which tells the implementation to only care about the materialized used the ``Keep.right`` predefined function, which tells the implementation to only care about the materialized
type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is ``Future[Int]`` type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is ``Future[Int]``
and because of using ``Keep.right``, the resulting :class:`RunnableFlow` has also a type parameter of ``Future[Int]``. and because of using ``Keep.right``, the resulting :class:`RunnableGraph` has also a type parameter of ``Future[Int]``.
This step does *not* yet materialize the This step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: :class:`RunnableFlow[Future[Int]]`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` be ``run()``, as indicated by its type: :class:`RunnableGraph[Future[Int]]`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow[T]`` is of type ``T``. to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph[T]`` is of type ``T``.
In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our tweets stream. In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our tweets stream.
In case of the stream failing, this future would complete with a Failure. In case of the stream failing, this future would complete with a Failure.
A :class:`RunnableFlow` may be reused A :class:`RunnableGraph` may be reused
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
will be different, as illustrated by this example: will be different, as illustrated by this example:

View file

@ -89,7 +89,7 @@ private object PoolSlot {
var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _ var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _
var inflightRequests = immutable.Queue.empty[RequestContext] var inflightRequests = immutable.Queue.empty[RequestContext]
val runnableFlow = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local)) val runnableGraph = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local))
.via(connectionFlow) .via(connectionFlow)
.toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self)).withDeploy(Deploy.local)))(Keep.both) .toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self)).withDeploy(Deploy.local)))(Keep.both)
@ -111,7 +111,7 @@ private object PoolSlot {
val unconnected: Receive = { val unconnected: Receive = {
case OnNext(rc: RequestContext) case OnNext(rc: RequestContext)
val (connInport, connOutport) = runnableFlow.run() val (connInport, connOutport) = runnableGraph.run()
connOutport ! Request(totalDemand) connOutport ! Request(totalDemand)
context.become(waitingForDemandFromConnection(connInport, connOutport, rc)) context.become(waitingForDemandFromConnection(connInport, connOutport, rc))

View file

@ -19,8 +19,8 @@ class DslConsistencySpec extends WordSpec with Matchers {
val sSinkClass = classOf[akka.stream.scaladsl.Sink[_, _]] val sSinkClass = classOf[akka.stream.scaladsl.Sink[_, _]]
val jSinkClass = classOf[akka.stream.javadsl.Sink[_, _]] val jSinkClass = classOf[akka.stream.javadsl.Sink[_, _]]
val jRunnableFlowClass = classOf[akka.stream.javadsl.RunnableFlow[_]] val jRunnableGraphClass = classOf[akka.stream.javadsl.RunnableGraph[_]]
val sRunnableFlowClass = classOf[akka.stream.scaladsl.RunnableFlow[_]] val sRunnableGraphClass = classOf[akka.stream.scaladsl.RunnableGraph[_]]
val ignore = val ignore =
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++ Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
@ -38,8 +38,8 @@ class DslConsistencySpec extends WordSpec with Matchers {
jSourceClass -> Set("timerTransform"), jSourceClass -> Set("timerTransform"),
jSinkClass -> Set(), jSinkClass -> Set(),
sRunnableFlowClass -> Set("builder"), sRunnableGraphClass -> Set("builder"),
jRunnableFlowClass Set("graph", "cyclesAllowed")) jRunnableGraphClass Set("graph", "cyclesAllowed"))
def materializing(m: Method): Boolean = m.getParameterTypes.contains(classOf[ActorMaterializer]) def materializing(m: Method): Boolean = m.getParameterTypes.contains(classOf[ActorMaterializer])
@ -54,7 +54,7 @@ class DslConsistencySpec extends WordSpec with Matchers {
("Source" -> List(sSourceClass, jSourceClass)) :: ("Source" -> List(sSourceClass, jSourceClass)) ::
("Flow" -> List(sFlowClass, jFlowClass)) :: ("Flow" -> List(sFlowClass, jFlowClass)) ::
("Sink" -> List(sSinkClass, jSinkClass)) :: ("Sink" -> List(sSinkClass, jSinkClass)) ::
("RunanbleFlow" -> List(sRunnableFlowClass, jRunnableFlowClass)) :: ("RunanbleFlow" -> List(sRunnableGraphClass, jRunnableGraphClass)) ::
Nil foreach { Nil foreach {
case (element, classes) case (element, classes)

View file

@ -38,7 +38,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
(classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) :: (classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) ::
(classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) :: (classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) ::
(classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) :: (classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) ::
(classOf[akka.stream.scaladsl.RunnableFlow[_]], classOf[akka.stream.javadsl.RunnableFlow[_]]) :: (classOf[akka.stream.scaladsl.RunnableGraph[_]], classOf[akka.stream.javadsl.RunnableGraph[_]]) ::
Nil Nil
// format: ON // format: ON

View file

@ -93,9 +93,9 @@ class FlowCompileSpec extends AkkaSpec {
} }
} }
"RunnableFlow" should { "RunnableGraph" should {
Sink.head[String] Sink.head[String]
val closed: RunnableFlow[Publisher[String]] = val closed: RunnableGraph[Publisher[String]] =
Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String])(Keep.right) Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String])(Keep.right)
"run" in { "run" in {
closed.run() closed.run()

View file

@ -10,7 +10,7 @@ import akka.japi.function
trait GraphCreate { trait GraphCreate {
import language.implicitConversions import language.implicitConversions
private implicit def r[M](run: scaladsl.RunnableFlow[M]): RunnableFlow[M] = new RunnableFlowAdapter(run) private implicit def r[M](run: scaladsl.RunnableGraph[M]): RunnableGraph[M] = new RunnableGraphAdapter(run)
/** /**
* Creates a new fully connected graph by passing a [[FlowGraph.Builder]] to the given create function. * Creates a new fully connected graph by passing a [[FlowGraph.Builder]] to the given create function.
@ -18,7 +18,7 @@ trait GraphCreate {
* The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown.
*/ */
@throws(classOf[IllegalArgumentException]) @throws(classOf[IllegalArgumentException])
def closed(block: function.Procedure[FlowGraph.Builder[Unit]]): RunnableFlow[Unit] = def closed(block: function.Procedure[FlowGraph.Builder[Unit]]): RunnableGraph[Unit] =
scaladsl.FlowGraph.closed() { b ⇒ block.apply(b.asJava) } scaladsl.FlowGraph.closed() { b ⇒ block.apply(b.asJava) }
/** /**
@ -37,7 +37,7 @@ trait GraphCreate {
*/ */
@throws(classOf[IllegalArgumentException]) @throws(classOf[IllegalArgumentException])
def closed[S1 <: Shape, M](g1: Graph[S1, M], def closed[S1 <: Shape, M](g1: Graph[S1, M],
block: function.Procedure2[FlowGraph.Builder[M], S1]): RunnableFlow[M] = block: function.Procedure2[FlowGraph.Builder[M], S1]): RunnableGraph[M] =
scaladsl.FlowGraph.closed(g1) { b ⇒ s => block.apply(b.asJava, s) } scaladsl.FlowGraph.closed(g1) { b ⇒ s => block.apply(b.asJava, s) }
/** /**
@ -58,7 +58,7 @@ trait GraphCreate {
*/ */
@throws(classOf[IllegalArgumentException]) @throws(classOf[IllegalArgumentException])
def closed[S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: function.Function2[M1, M2, M], def closed[S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: function.Function2[M1, M2, M],
block: function.Procedure3[FlowGraph.Builder[M], S1, S2]): RunnableFlow[M] = block: function.Procedure3[FlowGraph.Builder[M], S1, S2]): RunnableGraph[M] =
scaladsl.FlowGraph.closed(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) } scaladsl.FlowGraph.closed(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }
/** /**
@ -79,7 +79,7 @@ trait GraphCreate {
*/ */
@throws(classOf[IllegalArgumentException]) @throws(classOf[IllegalArgumentException])
def closed1[[#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: function.Function1[[#M1#], M], def closed1[[#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: function.Function1[[#M1#], M],
block: function.Procedure2[FlowGraph.Builder[M], [#S1#]]): RunnableFlow[M] = block: function.Procedure2[FlowGraph.Builder[M], [#S1#]]): RunnableGraph[M] =
scaladsl.FlowGraph.closed([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) } scaladsl.FlowGraph.closed([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }
/** /**

View file

@ -14,7 +14,7 @@ trait GraphApply {
* *
* The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown.
*/ */
def closed()(buildBlock: (FlowGraph.Builder[Unit]) ⇒ Unit): RunnableFlow[Unit] = { def closed()(buildBlock: (FlowGraph.Builder[Unit]) ⇒ Unit): RunnableGraph[Unit] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
buildBlock(builder) buildBlock(builder)
builder.buildRunnable() builder.buildRunnable()
@ -39,7 +39,7 @@ trait GraphApply {
* *
* The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown.
*/ */
def closed[Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape) ⇒ Unit): RunnableFlow[Mat] = { def closed[Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape) ⇒ Unit): RunnableGraph[Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val p1 = builder.add(g1) val p1 = builder.add(g1)
buildBlock(builder)(p1) buildBlock(builder)(p1)
@ -68,7 +68,7 @@ trait GraphApply {
* *
* The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown.
*/ */
def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableFlow[Mat] = { def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableGraph[Mat] = {
val builder = new FlowGraph.Builder val builder = new FlowGraph.Builder
val curried = combineMat.curried val curried = combineMat.curried
val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1)) val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1))

View file

@ -65,12 +65,12 @@ private[akka] case class ActorMaterializerImpl(
} }
} }
override def materialize[Mat](runnableFlow: Graph[ClosedShape, Mat]): Mat = { override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = {
if (haveShutDown.get()) if (haveShutDown.get())
throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.") throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.")
if (StreamLayout.Debug) runnableFlow.module.validate() if (StreamLayout.Debug) runnableGraph.module.validate()
val session = new MaterializerSession(runnableFlow.module) { val session = new MaterializerSession(runnableGraph.module) {
private val flowName = createFlowName() private val flowName = createFlowName()
private var nextId = 0 private var nextId = 0
private def stageName(attr: Attributes): String = { private def stageName(attr: Attributes): String = {

View file

@ -95,16 +95,16 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
new Sink(delegate.toMat(sink)(combinerToScala(combine))) new Sink(delegate.toMat(sink)(combinerToScala(combine)))
/** /**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]
*/ */
def join[M](flow: Graph[FlowShape[Out, In], M]): javadsl.RunnableFlow[Mat] = def join[M](flow: Graph[FlowShape[Out, In], M]): javadsl.RunnableGraph[Mat] =
new RunnableFlowAdapter(delegate.join(flow)) new RunnableGraphAdapter(delegate.join(flow))
/** /**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]
*/ */
def joinMat[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = def joinMat[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
new RunnableFlowAdapter(delegate.joinMat(flow)(combinerToScala(combine))) new RunnableGraphAdapter(delegate.joinMat(flow)(combinerToScala(combine)))
/** /**
* Join this [[Flow]] to a [[BidiFlow]] to close off the top of the protocol stack: * Join this [[Flow]] to a [[BidiFlow]] to close off the top of the protocol stack:
@ -789,28 +789,28 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* *
* Flow with attached input and output, can be executed. * Flow with attached input and output, can be executed.
*/ */
trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] { trait RunnableGraph[+Mat] extends Graph[ClosedShape, Mat] {
/** /**
* Run this flow and return the materialized values of the flow. * Run this flow and return the materialized values of the flow.
*/ */
def run(materializer: Materializer): Mat def run(materializer: Materializer): Mat
/** /**
* Transform only the materialized value of this RunnableFlow, leaving all other properties as they were. * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
*/ */
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableGraph[Mat2]
} }
/** INTERNAL API */ /** INTERNAL API */
private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat]) extends RunnableFlow[Mat] { private[akka] class RunnableGraphAdapter[Mat](runnable: scaladsl.RunnableGraph[Mat]) extends RunnableGraph[Mat] {
def shape = ClosedShape def shape = ClosedShape
def module = runnable.module def module = runnable.module
override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] = override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableGraph[Mat2] =
new RunnableFlowAdapter(runnable.mapMaterializedValue(f.apply _)) new RunnableGraphAdapter(runnable.mapMaterializedValue(f.apply _))
override def run(materializer: Materializer): Mat = runnable.run()(materializer) override def run(materializer: Materializer): Mat = runnable.run()(materializer)
override def withAttributes(attr: Attributes): RunnableFlow[Mat] = override def withAttributes(attr: Attributes): RunnableGraph[Mat] =
new RunnableFlowAdapter(runnable.withAttributes(attr)) new RunnableGraphAdapter(runnable.withAttributes(attr))
override def named(name: String): RunnableFlow[Mat] = override def named(name: String): RunnableGraph[Mat] =
new RunnableFlowAdapter(runnable.named(name)) new RunnableGraphAdapter(runnable.named(name))
} }

View file

@ -260,14 +260,14 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/** /**
* Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
*/ */
def to[M](sink: Graph[SinkShape[Out], M]): javadsl.RunnableFlow[Mat] = def to[M](sink: Graph[SinkShape[Out], M]): javadsl.RunnableGraph[Mat] =
new RunnableFlowAdapter(delegate.to(sink)) new RunnableGraphAdapter(delegate.to(sink))
/** /**
* Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
*/ */
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] =
new RunnableFlowAdapter(delegate.toMat(sink)(combinerToScala(combine))) new RunnableGraphAdapter(delegate.toMat(sink)(combinerToScala(combine)))
/** /**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value

View file

@ -134,7 +134,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
new Flow(module.transformMaterializedValue(f.asInstanceOf[Any Any])) new Flow(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/** /**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]]. * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{ * {{{
* +------+ +-------+ * +------+ +-------+
* | | ~Out~> | | * | | ~Out~> | |
@ -146,10 +146,10 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* value of the current flow (ignoring the other Flows value), use * value of the current flow (ignoring the other Flows value), use
* [[Flow#joinMat[Mat2* joinMat]] if a different strategy is needed. * [[Flow#joinMat[Mat2* joinMat]] if a different strategy is needed.
*/ */
def join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableFlow[Mat] = joinMat(flow)(Keep.left) def join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableGraph[Mat] = joinMat(flow)(Keep.left)
/** /**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]
* {{{ * {{{
* +------+ +-------+ * +------+ +-------+
* | | ~Out~> | | * | | ~Out~> | |
@ -160,9 +160,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* The `combine` function is used to compose the materialized values of this flow and that * The `combine` function is used to compose the materialized values of this flow and that
* Flow into the materialized value of the resulting Flow. * Flow into the materialized value of the resulting Flow.
*/ */
def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) Mat3): RunnableFlow[Mat3] = { def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] = {
val flowCopy = flow.module.carbonCopy val flowCopy = flow.module.carbonCopy
RunnableFlow( RunnableGraph(
module module
.grow(flowCopy, combine) .grow(flowCopy, combine)
.connect(shape.outlet, flowCopy.shape.inlets.head) .connect(shape.outlet, flowCopy.shape.inlets.head)
@ -318,14 +318,14 @@ object Flow extends FlowApply {
/** /**
* Flow with attached input and output, can be executed. * Flow with attached input and output, can be executed.
*/ */
case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] { case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] {
assert(module.isRunnable) assert(module.isRunnable)
def shape = ClosedShape def shape = ClosedShape
/** /**
* Transform only the materialized value of this RunnableFlow, leaving all other properties as they were. * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
*/ */
def mapMaterializedValue[Mat2](f: Mat Mat2): RunnableFlow[Mat2] = def mapMaterializedValue[Mat2](f: Mat Mat2): RunnableGraph[Mat2] =
copy(module.transformMaterializedValue(f.asInstanceOf[Any Any])) copy(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/** /**
@ -333,10 +333,10 @@ case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) e
*/ */
def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)
override def withAttributes(attr: Attributes): RunnableFlow[Mat] = override def withAttributes(attr: Attributes): RunnableGraph[Mat] =
new RunnableFlow(module.withAttributes(attr).wrap) new RunnableGraph(module.withAttributes(attr).wrap)
override def named(name: String): RunnableFlow[Mat] = withAttributes(Attributes.name(name)) override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name))
} }

View file

@ -380,13 +380,13 @@ object FlowGraph extends GraphApply {
.connect(port, op.inPort) .connect(port, op.inPort)
} }
private[stream] def buildRunnable[Mat](): RunnableFlow[Mat] = { private[stream] def buildRunnable[Mat](): RunnableGraph[Mat] = {
if (!moduleInProgress.isRunnable) { if (!moduleInProgress.isRunnable) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Cannot build the RunnableFlow because there are unconnected ports: " + "Cannot build the RunnableGraph because there are unconnected ports: " +
(moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", ")) (moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", "))
} }
new RunnableFlow(moduleInProgress.wrap()) new RunnableGraph(moduleInProgress.wrap())
} }
private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = { private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = {

View file

@ -70,15 +70,15 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
* Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]], * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
* concatenating the processing steps of both. * concatenating the processing steps of both.
*/ */
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableFlow[Mat] = toMat(sink)(Keep.left) def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left)
/** /**
* Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]], * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
* concatenating the processing steps of both. * concatenating the processing steps of both.
*/ */
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): RunnableFlow[Mat3] = { def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] = {
val sinkCopy = sink.module.carbonCopy val sinkCopy = sink.module.carbonCopy
RunnableFlow(module.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)) RunnableGraph(module.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine))
} }
/** /**

View file

@ -21,8 +21,8 @@ package akka.stream
* [[org.reactivestreams.Subscriber]]. A flow with an attached output and open input * [[org.reactivestreams.Subscriber]]. A flow with an attached output and open input
* is also a [[Sink]]. * is also a [[Sink]].
* *
* If a flow has both an attached input and an attached output it becomes a [[RunnableFlow]]. * If a flow has both an attached input and an attached output it becomes a [[RunnableGraph]].
* In order to execute this pipeline the flow must be materialized by calling [[RunnableFlow#run]] on it. * In order to execute this pipeline the flow must be materialized by calling [[RunnableGraph#run]] on it.
* *
* You can create your `Source`, `Flow` and `Sink` in any order and then wire them together before * You can create your `Source`, `Flow` and `Sink` in any order and then wire them together before
* they are materialized by connecting them using [[Flow#via]] and [[Flow#to]], or connecting them into a * they are materialized by connecting them using [[Flow#via]] and [[Flow#to]], or connecting them into a