From c7a974dd1e890e9c73233764a6bbeadeeff2a2bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 23 Jun 2015 18:41:55 +0200 Subject: [PATCH] Rename RunnableFlow to RunnableGraph --- .../rst/java/stream-flows-and-basics.rst | 8 ++--- akka-docs-dev/rst/java/stream-graphs.rst | 6 ++-- akka-docs-dev/rst/java/stream-quickstart.rst | 6 ++-- .../scala/code/docs/stream/FlowDocSpec.scala | 26 ++++++++-------- .../code/docs/stream/FlowStagesSpec.scala | 2 +- .../code/docs/stream/IntegrationDocSpec.scala | 10 +++---- .../TwitterStreamQuickstartDocSpec.scala | 10 +++---- .../rst/scala/stream-flows-and-basics.rst | 8 ++--- akka-docs-dev/rst/scala/stream-graphs.rst | 4 +-- akka-docs-dev/rst/scala/stream-quickstart.rst | 8 ++--- .../http/impl/engine/client/PoolSlot.scala | 4 +-- .../akka/stream/DslConsistencySpec.scala | 10 +++---- .../stream/DslFactoriesConsistencySpec.scala | 2 +- .../stream/scaladsl/FlowCompileSpec.scala | 4 +-- .../stream/javadsl/GraphCreate.scala.template | 10 +++---- .../stream/scaladsl/GraphApply.scala.template | 6 ++-- .../stream/impl/ActorMaterializerImpl.scala | 6 ++-- .../main/scala/akka/stream/javadsl/Flow.scala | 30 +++++++++---------- .../scala/akka/stream/javadsl/Source.scala | 6 ++-- .../scala/akka/stream/scaladsl/Flow.scala | 22 +++++++------- .../scala/akka/stream/scaladsl/Graph.scala | 6 ++-- .../scala/akka/stream/scaladsl/Source.scala | 6 ++-- .../scala/akka/stream/scaladsl/package.scala | 4 +-- 23 files changed, 102 insertions(+), 102 deletions(-) diff --git a/akka-docs-dev/rst/java/stream-flows-and-basics.rst b/akka-docs-dev/rst/java/stream-flows-and-basics.rst index 287ab07052..969f0daacc 100644 --- a/akka-docs-dev/rst/java/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/java/stream-flows-and-basics.rst @@ -45,14 +45,14 @@ Sink Flow A processing stage which has *exactly one input and output*, which connects its up- and downstreams by transforming the data elements flowing through it. -RunnableFlow +RunnableGraph A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink, -it will be represented by the ``RunnableFlow`` type, indicating that it is ready to be executed. +it will be represented by the ``RunnableGraph`` type, indicating that it is ready to be executed. -It is important to remember that even after constructing the ``RunnableFlow`` by connecting all the source, sink and +It is important to remember that even after constructing the ``RunnableGraph`` by connecting all the source, sink and different processing stages, no data will flow through it until it is materialized. Materialization is the process of allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, @@ -61,7 +61,7 @@ one actor prepare the work, and then have it be materialized at some completely .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#materialization-in-steps -After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both +After running (materializing) the ``RunnableGraph`` we get a special container object, the ``MaterializedMap``. Both sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result of the folding process over the stream. In general, a stream can expose multiple materialized values, diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst index 96dfcd532d..19473ac18e 100644 --- a/akka-docs-dev/rst/java/stream-graphs.rst +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -60,7 +60,7 @@ or ending a :class:`Flow`. By looking at the snippets above, it should be apparent that the ``builder`` object is *mutable*. The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles. -Once the FlowGraph has been constructed though, the :class:`RunnableFlow` instance *is immutable, thread-safe, and freely shareable*. +Once the FlowGraph has been constructed though, the :class:`RunnableGraph` instance *is immutable, thread-safe, and freely shareable*. The same is true of all flow pieces—sources, sinks, and flows—once they are constructed. This means that you can safely re-use one given Flow in multiple places in a processing graph. @@ -88,8 +88,8 @@ all of its different phases in different places and in the end connect them all This can be achieved using ``FlowGraph.factory().partial()`` instead of ``FlowGraph.factory().closed()``, which will return a ``Graph`` instead of a -``RunnableFlow``. The reason of representing it as a different type is that a -:class:`RunnableFlow` requires all ports to be connected, and if they are not +``RunnableGraph``. The reason of representing it as a different type is that a +:class:`RunnableGraph` requires all ports to be connected, and if they are not it will throw an exception at construction time, which helps to avoid simple wiring errors while working with graphs. A partial flow graph however allows you to return the set of yet to be connected ports from the code block that diff --git a/akka-docs-dev/rst/java/stream-quickstart.rst b/akka-docs-dev/rst/java/stream-quickstart.rst index b4476a6f08..dce68d5817 100644 --- a/akka-docs-dev/rst/java/stream-quickstart.rst +++ b/akka-docs-dev/rst/java/stream-quickstart.rst @@ -146,8 +146,8 @@ First, we prepare the :class:`FoldSink` which will be used to sum all ``Integer` Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can -be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` -to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, +be ``run()``, as indicated by its type: :class:`RunnableGraph`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` +to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph`` or ``FlowGraph`` is ``MaterializedMap``, which can be used to retrieve materialized values from the running stream. In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map @@ -155,7 +155,7 @@ obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Mat as ``Future`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. In case of the stream failing, this future would complete with a Failure. -The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused +The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableGraph` may be reused and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations will be different, as illustrated by this example: diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 31a255e368..f73f7c3ac9 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -35,8 +35,8 @@ class FlowDocSpec extends AkkaSpec { val source = Source(1 to 10) val sink = Sink.fold[Int, Int](0)(_ + _) - // connect the Source to the Sink, obtaining a RunnableFlow - val runnable: RunnableFlow[Future[Int]] = source.toMat(sink)(Keep.right) + // connect the Source to the Sink, obtaining a RunnableGraph + val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // materialize the flow and get the value of the FoldSink val sum: Future[Int] = runnable.run() @@ -56,9 +56,9 @@ class FlowDocSpec extends AkkaSpec { "materialization is unique" in { //#stream-reuse - // connect the Source to the Sink, obtaining a RunnableFlow + // connect the Source to the Sink, obtaining a RunnableGraph val sink = Sink.fold[Int, Int](0)(_ + _) - val runnable: RunnableFlow[Future[Int]] = + val runnable: RunnableGraph[Future[Int]] = Source(1 to 10).toMat(sink)(Keep.right) // get the materialized value of the FoldSink @@ -162,11 +162,11 @@ class FlowDocSpec extends AkkaSpec { val sink: Sink[Int, Future[Int]] = Sink.head[Int] // By default, the materialized value of the leftmost stage is preserved - val r1: RunnableFlow[Promise[Unit]] = source.via(flow).to(sink) + val r1: RunnableGraph[Promise[Unit]] = source.via(flow).to(sink) // Simple selection of materialized values by using Keep.right - val r2: RunnableFlow[Cancellable] = source.viaMat(flow)(Keep.right).to(sink) - val r3: RunnableFlow[Future[Int]] = source.via(flow).toMat(sink)(Keep.right) + val r2: RunnableGraph[Cancellable] = source.viaMat(flow)(Keep.right).to(sink) + val r3: RunnableGraph[Future[Int]] = source.via(flow).toMat(sink)(Keep.right) // Using runWith will always give the materialized values of the stages added // by runWith() itself @@ -175,21 +175,21 @@ class FlowDocSpec extends AkkaSpec { val r6: (Promise[Unit], Future[Int]) = flow.runWith(source, sink) // Using more complext combinations - val r7: RunnableFlow[(Promise[Unit], Cancellable)] = + val r7: RunnableGraph[(Promise[Unit], Cancellable)] = source.viaMat(flow)(Keep.both).to(sink) - val r8: RunnableFlow[(Promise[Unit], Future[Int])] = + val r8: RunnableGraph[(Promise[Unit], Future[Int])] = source.via(flow).toMat(sink)(Keep.both) - val r9: RunnableFlow[((Promise[Unit], Cancellable), Future[Int])] = + val r9: RunnableGraph[((Promise[Unit], Cancellable), Future[Int])] = source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both) - val r10: RunnableFlow[(Cancellable, Future[Int])] = + val r10: RunnableGraph[(Cancellable, Future[Int])] = source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both) // It is also possible to map over the materialized values. In r9 we had a // doubly nested pair, but we want to flatten it out - val r11: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] = + val r11: RunnableGraph[(Promise[Unit], Cancellable, Future[Int])] = r9.mapMaterializedValue { case ((promise, cancellable), future) => (promise, cancellable, future) @@ -204,7 +204,7 @@ class FlowDocSpec extends AkkaSpec { future.map(_ + 3) // The result of r11 can be also achieved by using the Graph API - val r12: RunnableFlow[(Promise[Unit], Cancellable, Future[Int])] = + val r12: RunnableGraph[(Promise[Unit], Cancellable, Future[Int])] = FlowGraph.closed(source, flow, sink)((_, _, _)) { implicit builder => (src, f, dst) => import FlowGraph.Implicits._ diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala index a8cedd8f11..806da0b4fb 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala @@ -1,7 +1,7 @@ package docs.stream import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ RunnableFlow, Sink, Source, Flow, Keep } +import akka.stream.scaladsl.{ RunnableGraph, Sink, Source, Flow, Keep } import akka.stream.stage.PushPullStage import akka.stream.testkit.AkkaSpec import org.scalatest.concurrent.{ ScalaFutures, Futures } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala index 63042297b4..0c1d6ccf7c 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -146,7 +146,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { //#email-addresses-mapAsync //#send-emails - val sendEmails: RunnableFlow[Unit] = + val sendEmails: RunnableGraph[Unit] = emailAddresses .mapAsync(4)(address => { emailServer.send( @@ -196,7 +196,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { .mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle)) .collect { case Some(emailAddress) => emailAddress } - val sendEmails: RunnableFlow[Unit] = + val sendEmails: RunnableGraph[Unit] = emailAddresses .mapAsyncUnordered(4)(address => { emailServer.send( @@ -231,7 +231,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { //#blocking-mapAsync val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher") - val sendTextMessages: RunnableFlow[Unit] = + val sendTextMessages: RunnableGraph[Unit] = phoneNumbers .mapAsync(4)(phoneNo => { Future { @@ -271,7 +271,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet")) } .withAttributes(ActorAttributes.dispatcher("blocking-dispatcher")) - val sendTextMessages: RunnableFlow[Unit] = + val sendTextMessages: RunnableGraph[Unit] = phoneNumbers.via(send).to(Sink.ignore) sendTextMessages.run() @@ -294,7 +294,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val akkaTweets: Source[Tweet, Unit] = tweets.filter(_.hashtags.contains(akka)) implicit val timeout = Timeout(3.seconds) - val saveTweets: RunnableFlow[Unit] = + val saveTweets: RunnableGraph[Unit] = akkaTweets .mapAsync(4)(tweet => database ? Save(tweet)) .to(Sink.ignore) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 53d1bb18d5..4ebc73ca0d 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -157,7 +157,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { //#tweets-fold-count val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) - val counter: RunnableFlow[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right) + val counter: RunnableGraph[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right) val sum: Future[Int] = counter.run() @@ -176,20 +176,20 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { //#tweets-runnable-flow-materialized-twice val sumSink = Sink.fold[Int, Int](0)(_ + _) - val counterRunnableFlow: RunnableFlow[Future[Int]] = + val counterRunnableGraph: RunnableGraph[Future[Int]] = tweetsInMinuteFromNow .filter(_.hashtags contains akka) .map(t => 1) .toMat(sumSink)(Keep.right) // materialize the stream once in the morning - val morningTweetsCount: Future[Int] = counterRunnableFlow.run() + val morningTweetsCount: Future[Int] = counterRunnableGraph.run() // and once in the evening, reusing the flow - val eveningTweetsCount: Future[Int] = counterRunnableFlow.run() + val eveningTweetsCount: Future[Int] = counterRunnableGraph.run() //#tweets-runnable-flow-materialized-twice - val sum: Future[Int] = counterRunnableFlow.run() + val sum: Future[Int] = counterRunnableGraph.run() sum.map { c => println(s"Total tweets processed: $c") } } diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index ac2beb8239..12febcff4a 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -45,14 +45,14 @@ Sink Flow A processing stage which has *exactly one input and output*, which connects its up- and downstreams by transforming the data elements flowing through it. -RunnableFlow +RunnableGraph A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink, -it will be represented by the ``RunnableFlow`` type, indicating that it is ready to be executed. +it will be represented by the ``RunnableGraph`` type, indicating that it is ready to be executed. -It is important to remember that even after constructing the ``RunnableFlow`` by connecting all the source, sink and +It is important to remember that even after constructing the ``RunnableGraph`` by connecting all the source, sink and different processing stages, no data will flow through it until it is materialized. Materialization is the process of allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, @@ -61,7 +61,7 @@ one actor prepare the work, and then have it be materialized at some completely .. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps -After running (materializing) the ``RunnableFlow[T]`` we get back the materialized value of type T. Every stream processing +After running (materializing) the ``RunnableGraph[T]`` we get back the materialized value of type T. Every stream processing stage can produce a materialized value, and it is the responsibility of the user to combine them to a new type. In the above example we used ``toMat`` to indicate that we want to transform the materialized value of the source and sink, and we used the convenience function ``Keep.right`` to say that we are only interested in the materialized value diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index 1f0942515b..d3aa5d8e9e 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -96,8 +96,8 @@ all of its different phases in different places and in the end connect them all This can be achieved using ``FlowGraph.partial`` instead of ``FlowGraph.closed``, which will return a ``Graph`` instead of a -``RunnableFlow``. The reason of representing it as a different type is that a -:class:`RunnableFlow` requires all ports to be connected, and if they are not +``RunnableGraph``. The reason of representing it as a different type is that a +:class:`RunnableGraph` requires all ports to be connected, and if they are not it will throw an exception at construction time, which helps to avoid simple wiring errors while working with graphs. A partial flow graph however allows you to return the set of yet to be connected ports from the code block that diff --git a/akka-docs-dev/rst/scala/stream-quickstart.rst b/akka-docs-dev/rst/scala/stream-quickstart.rst index 9ceaf05fca..a04442ea2d 100644 --- a/akka-docs-dev/rst/scala/stream-quickstart.rst +++ b/akka-docs-dev/rst/scala/stream-quickstart.rst @@ -147,16 +147,16 @@ finally we connect the flow using ``toMat`` the previously prepared Sink. Rememb materialized. When you chain these together, you can explicitly combine their materialized values: in our example we used the ``Keep.right`` predefined function, which tells the implementation to only care about the materialized type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is ``Future[Int]`` -and because of using ``Keep.right``, the resulting :class:`RunnableFlow` has also a type parameter of ``Future[Int]``. +and because of using ``Keep.right``, the resulting :class:`RunnableGraph` has also a type parameter of ``Future[Int]``. This step does *not* yet materialize the processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can -be ``run()``, as indicated by its type: :class:`RunnableFlow[Future[Int]]`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` -to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow[T]`` is of type ``T``. +be ``run()``, as indicated by its type: :class:`RunnableGraph[Future[Int]]`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` +to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph[T]`` is of type ``T``. In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our tweets stream. In case of the stream failing, this future would complete with a Failure. -A :class:`RunnableFlow` may be reused +A :class:`RunnableGraph` may be reused and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations will be different, as illustrated by this example: diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index fc349d159e..0a65554ffe 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -89,7 +89,7 @@ private object PoolSlot { var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _ var inflightRequests = immutable.Queue.empty[RequestContext] - val runnableFlow = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local)) + val runnableGraph = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local)) .via(connectionFlow) .toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self)).withDeploy(Deploy.local)))(Keep.both) @@ -111,7 +111,7 @@ private object PoolSlot { val unconnected: Receive = { case OnNext(rc: RequestContext) ⇒ - val (connInport, connOutport) = runnableFlow.run() + val (connInport, connOutport) = runnableGraph.run() connOutport ! Request(totalDemand) context.become(waitingForDemandFromConnection(connInport, connOutport, rc)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index f72e037ec6..79688dd703 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -19,8 +19,8 @@ class DslConsistencySpec extends WordSpec with Matchers { val sSinkClass = classOf[akka.stream.scaladsl.Sink[_, _]] val jSinkClass = classOf[akka.stream.javadsl.Sink[_, _]] - val jRunnableFlowClass = classOf[akka.stream.javadsl.RunnableFlow[_]] - val sRunnableFlowClass = classOf[akka.stream.scaladsl.RunnableFlow[_]] + val jRunnableGraphClass = classOf[akka.stream.javadsl.RunnableGraph[_]] + val sRunnableGraphClass = classOf[akka.stream.scaladsl.RunnableGraph[_]] val ignore = Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++ @@ -38,8 +38,8 @@ class DslConsistencySpec extends WordSpec with Matchers { jSourceClass -> Set("timerTransform"), jSinkClass -> Set(), - sRunnableFlowClass -> Set("builder"), - jRunnableFlowClass → Set("graph", "cyclesAllowed")) + sRunnableGraphClass -> Set("builder"), + jRunnableGraphClass → Set("graph", "cyclesAllowed")) def materializing(m: Method): Boolean = m.getParameterTypes.contains(classOf[ActorMaterializer]) @@ -54,7 +54,7 @@ class DslConsistencySpec extends WordSpec with Matchers { ("Source" -> List(sSourceClass, jSourceClass)) :: ("Flow" -> List(sFlowClass, jFlowClass)) :: ("Sink" -> List(sSinkClass, jSinkClass)) :: - ("RunanbleFlow" -> List(sRunnableFlowClass, jRunnableFlowClass)) :: + ("RunanbleFlow" -> List(sRunnableGraphClass, jRunnableGraphClass)) :: Nil foreach { case (element, classes) ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index d4b94865c4..cf94e0aff6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -38,7 +38,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { (classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) :: (classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) :: (classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) :: - (classOf[akka.stream.scaladsl.RunnableFlow[_]], classOf[akka.stream.javadsl.RunnableFlow[_]]) :: + (classOf[akka.stream.scaladsl.RunnableGraph[_]], classOf[akka.stream.javadsl.RunnableGraph[_]]) :: Nil // format: ON diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala index 93fba308bd..0d6a0edd47 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala @@ -93,9 +93,9 @@ class FlowCompileSpec extends AkkaSpec { } } - "RunnableFlow" should { + "RunnableGraph" should { Sink.head[String] - val closed: RunnableFlow[Publisher[String]] = + val closed: RunnableGraph[Publisher[String]] = Source(Seq(1, 2, 3)).map(_.toString).toMat(Sink.publisher[String])(Keep.right) "run" in { closed.run() diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template index 7ac6b858ba..adf9c5f697 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/GraphCreate.scala.template @@ -10,7 +10,7 @@ import akka.japi.function trait GraphCreate { import language.implicitConversions - private implicit def r[M](run: scaladsl.RunnableFlow[M]): RunnableFlow[M] = new RunnableFlowAdapter(run) + private implicit def r[M](run: scaladsl.RunnableGraph[M]): RunnableGraph[M] = new RunnableGraphAdapter(run) /** * Creates a new fully connected graph by passing a [[FlowGraph.Builder]] to the given create function. @@ -18,7 +18,7 @@ trait GraphCreate { * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. */ @throws(classOf[IllegalArgumentException]) - def closed(block: function.Procedure[FlowGraph.Builder[Unit]]): RunnableFlow[Unit] = + def closed(block: function.Procedure[FlowGraph.Builder[Unit]]): RunnableGraph[Unit] = scaladsl.FlowGraph.closed() { b ⇒ block.apply(b.asJava) } /** @@ -37,7 +37,7 @@ trait GraphCreate { */ @throws(classOf[IllegalArgumentException]) def closed[S1 <: Shape, M](g1: Graph[S1, M], - block: function.Procedure2[FlowGraph.Builder[M], S1]): RunnableFlow[M] = + block: function.Procedure2[FlowGraph.Builder[M], S1]): RunnableGraph[M] = scaladsl.FlowGraph.closed(g1) { b ⇒ s => block.apply(b.asJava, s) } /** @@ -58,7 +58,7 @@ trait GraphCreate { */ @throws(classOf[IllegalArgumentException]) def closed[S1 <: Shape, S2 <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: function.Function2[M1, M2, M], - block: function.Procedure3[FlowGraph.Builder[M], S1, S2]): RunnableFlow[M] = + block: function.Procedure3[FlowGraph.Builder[M], S1, S2]): RunnableGraph[M] = scaladsl.FlowGraph.closed(g1, g2)(combineMat.apply _) { b => (s1, s2) => block.apply(b.asJava, s1, s2) } /** @@ -79,7 +79,7 @@ trait GraphCreate { */ @throws(classOf[IllegalArgumentException]) def closed1[[#S1 <: Shape#], [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: function.Function1[[#M1#], M], - block: function.Procedure2[FlowGraph.Builder[M], [#S1#]]): RunnableFlow[M] = + block: function.Procedure2[FlowGraph.Builder[M], [#S1#]]): RunnableGraph[M] = scaladsl.FlowGraph.closed([#g1#])(combineMat.apply _) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) } /** diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index b46188ee1e..d6ea9b1405 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -14,7 +14,7 @@ trait GraphApply { * * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. */ - def closed()(buildBlock: (FlowGraph.Builder[Unit]) ⇒ Unit): RunnableFlow[Unit] = { + def closed()(buildBlock: (FlowGraph.Builder[Unit]) ⇒ Unit): RunnableGraph[Unit] = { val builder = new FlowGraph.Builder buildBlock(builder) builder.buildRunnable() @@ -39,7 +39,7 @@ trait GraphApply { * * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. */ - def closed[Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape) ⇒ Unit): RunnableFlow[Mat] = { + def closed[Mat](g1: Graph[Shape, Mat])(buildBlock: FlowGraph.Builder[Mat] ⇒ (g1.Shape) ⇒ Unit): RunnableGraph[Mat] = { val builder = new FlowGraph.Builder val p1 = builder.add(g1) buildBlock(builder)(p1) @@ -68,7 +68,7 @@ trait GraphApply { * * The created graph must have all ports connected, otherwise an [[IllegalArgumentException]] is thrown. */ - def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableFlow[Mat] = { + def closed[Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) ⇒ Mat)(buildBlock: FlowGraph.Builder[Mat] ⇒ ([#g1.Shape#]) ⇒ Unit): RunnableGraph[Mat] = { val builder = new FlowGraph.Builder val curried = combineMat.curried val s##1 = builder.add(g##1, (m##1: M##1) ⇒ curried(m##1)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 3bb7520087..d8a005676b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -65,12 +65,12 @@ private[akka] case class ActorMaterializerImpl( } } - override def materialize[Mat](runnableFlow: Graph[ClosedShape, Mat]): Mat = { + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { if (haveShutDown.get()) throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.") - if (StreamLayout.Debug) runnableFlow.module.validate() + if (StreamLayout.Debug) runnableGraph.module.validate() - val session = new MaterializerSession(runnableFlow.module) { + val session = new MaterializerSession(runnableGraph.module) { private val flowName = createFlowName() private var nextId = 0 private def stageName(attr: Attributes): String = { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f3567c8025..9351d104e9 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -95,16 +95,16 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph new Sink(delegate.toMat(sink)(combinerToScala(combine))) /** - * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] + * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]] */ - def join[M](flow: Graph[FlowShape[Out, In], M]): javadsl.RunnableFlow[Mat] = - new RunnableFlowAdapter(delegate.join(flow)) + def join[M](flow: Graph[FlowShape[Out, In], M]): javadsl.RunnableGraph[Mat] = + new RunnableGraphAdapter(delegate.join(flow)) /** - * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] + * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]] */ def joinMat[M, M2](flow: Graph[FlowShape[Out, In], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = - new RunnableFlowAdapter(delegate.joinMat(flow)(combinerToScala(combine))) + new RunnableGraphAdapter(delegate.joinMat(flow)(combinerToScala(combine))) /** * Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack: @@ -789,28 +789,28 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * * Flow with attached input and output, can be executed. */ -trait RunnableFlow[+Mat] extends Graph[ClosedShape, Mat] { +trait RunnableGraph[+Mat] extends Graph[ClosedShape, Mat] { /** * Run this flow and return the materialized values of the flow. */ def run(materializer: Materializer): Mat /** - * Transform only the materialized value of this RunnableFlow, leaving all other properties as they were. + * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were. */ - def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] + def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableGraph[Mat2] } /** INTERNAL API */ -private[akka] class RunnableFlowAdapter[Mat](runnable: scaladsl.RunnableFlow[Mat]) extends RunnableFlow[Mat] { +private[akka] class RunnableGraphAdapter[Mat](runnable: scaladsl.RunnableGraph[Mat]) extends RunnableGraph[Mat] { def shape = ClosedShape def module = runnable.module - override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableFlow[Mat2] = - new RunnableFlowAdapter(runnable.mapMaterializedValue(f.apply _)) + override def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): RunnableGraph[Mat2] = + new RunnableGraphAdapter(runnable.mapMaterializedValue(f.apply _)) override def run(materializer: Materializer): Mat = runnable.run()(materializer) - override def withAttributes(attr: Attributes): RunnableFlow[Mat] = - new RunnableFlowAdapter(runnable.withAttributes(attr)) + override def withAttributes(attr: Attributes): RunnableGraph[Mat] = + new RunnableGraphAdapter(runnable.withAttributes(attr)) - override def named(name: String): RunnableFlow[Mat] = - new RunnableFlowAdapter(runnable.named(name)) + override def named(name: String): RunnableGraph[Mat] = + new RunnableGraphAdapter(runnable.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index aad3b25698..f20132062b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -260,14 +260,14 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. */ - def to[M](sink: Graph[SinkShape[Out], M]): javadsl.RunnableFlow[Mat] = - new RunnableFlowAdapter(delegate.to(sink)) + def to[M](sink: Graph[SinkShape[Out], M]): javadsl.RunnableGraph[Mat] = + new RunnableGraphAdapter(delegate.to(sink)) /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. */ def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableFlow[M2] = - new RunnableFlowAdapter(delegate.toMat(sink)(combinerToScala(combine))) + new RunnableGraphAdapter(delegate.toMat(sink)(combinerToScala(combine))) /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index bedcc68ac5..fa558ffe3a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -134,7 +134,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) new Flow(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) /** - * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]]. + * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]. * {{{ * +------+ +-------+ * | | ~Out~> | | @@ -146,10 +146,10 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * value of the current flow (ignoring the other Flow’s value), use * [[Flow#joinMat[Mat2* joinMat]] if a different strategy is needed. */ - def join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableFlow[Mat] = joinMat(flow)(Keep.left) + def join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableGraph[Mat] = joinMat(flow)(Keep.left) /** - * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableFlow]] + * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]] * {{{ * +------+ +-------+ * | | ~Out~> | | @@ -160,9 +160,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) * The `combine` function is used to compose the materialized values of this flow and that * Flow into the materialized value of the resulting Flow. */ - def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableFlow[Mat3] = { + def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = { val flowCopy = flow.module.carbonCopy - RunnableFlow( + RunnableGraph( module .grow(flowCopy, combine) .connect(shape.outlet, flowCopy.shape.inlets.head) @@ -318,14 +318,14 @@ object Flow extends FlowApply { /** * Flow with attached input and output, can be executed. */ -case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] { +case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] { assert(module.isRunnable) def shape = ClosedShape /** - * Transform only the materialized value of this RunnableFlow, leaving all other properties as they were. + * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were. */ - def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): RunnableFlow[Mat2] = + def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): RunnableGraph[Mat2] = copy(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) /** @@ -333,10 +333,10 @@ case class RunnableFlow[+Mat](private[stream] val module: StreamLayout.Module) e */ def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) - override def withAttributes(attr: Attributes): RunnableFlow[Mat] = - new RunnableFlow(module.withAttributes(attr).wrap) + override def withAttributes(attr: Attributes): RunnableGraph[Mat] = + new RunnableGraph(module.withAttributes(attr).wrap) - override def named(name: String): RunnableFlow[Mat] = withAttributes(Attributes.name(name)) + override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 18b3024340..75d0e3a8b2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -380,13 +380,13 @@ object FlowGraph extends GraphApply { .connect(port, op.inPort) } - private[stream] def buildRunnable[Mat](): RunnableFlow[Mat] = { + private[stream] def buildRunnable[Mat](): RunnableGraph[Mat] = { if (!moduleInProgress.isRunnable) { throw new IllegalArgumentException( - "Cannot build the RunnableFlow because there are unconnected ports: " + + "Cannot build the RunnableGraph because there are unconnected ports: " + (moduleInProgress.outPorts ++ moduleInProgress.inPorts).mkString(", ")) } - new RunnableFlow(moduleInProgress.wrap()) + new RunnableGraph(moduleInProgress.wrap()) } private[stream] def buildSource[T, Mat](outlet: Outlet[T]): Source[T, Mat] = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 8975f112e6..4fbf948f47 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -70,15 +70,15 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]], * concatenating the processing steps of both. */ - def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableFlow[Mat] = toMat(sink)(Keep.left) + def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left) /** * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]], * concatenating the processing steps of both. */ - def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableFlow[Mat3] = { + def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = { val sinkCopy = sink.module.carbonCopy - RunnableFlow(module.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)) + RunnableGraph(module.growConnect(sinkCopy, shape.outlet, sinkCopy.shape.inlets.head, combine)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/package.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/package.scala index f02b7abf11..4ddad29402 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/package.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/package.scala @@ -21,8 +21,8 @@ package akka.stream * [[org.reactivestreams.Subscriber]]. A flow with an attached output and open input * is also a [[Sink]]. * - * If a flow has both an attached input and an attached output it becomes a [[RunnableFlow]]. - * In order to execute this pipeline the flow must be materialized by calling [[RunnableFlow#run]] on it. + * If a flow has both an attached input and an attached output it becomes a [[RunnableGraph]]. + * In order to execute this pipeline the flow must be materialized by calling [[RunnableGraph#run]] on it. * * You can create your `Source`, `Flow` and `Sink` in any order and then wire them together before * they are materialized by connecting them using [[Flow#via]] and [[Flow#to]], or connecting them into a