From 4e49d75ad8a569b9de9c464ec9be4e603a1e2a8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Wed, 10 Feb 2016 13:56:38 +0100 Subject: [PATCH] !str #19732 Concise and consistent way to mark async boundaries --- .../java/code/docs/stream/FlowDocTest.java | 3 +- .../java/code/docs/stream/MigrationsJava.java | 6 +++ .../stream/migration-guide-2.0-2.4-java.rst | 15 ++++++ .../java/stream/stream-flows-and-basics.rst | 5 +- .../scala/code/docs/stream/FlowDocSpec.scala | 5 +- .../code/docs/stream/MigrationsScala.scala | 5 ++ .../stream/migration-guide-2.0-2.4-scala.rst | 15 ++++++ .../scala/stream/stream-flows-and-basics.rst | 5 +- .../test/scala/akka/stream/FusingSpec.scala | 6 +-- .../akka/stream/scaladsl/BidiFlowSpec.scala | 2 +- .../scala/akka/stream/scaladsl/FlowSpec.scala | 2 +- .../scala/akka/stream/scaladsl/SinkSpec.scala | 2 +- .../akka/stream/scaladsl/SourceSpec.scala | 2 +- .../src/main/scala/akka/stream/Graph.scala | 5 ++ .../scala/akka/stream/impl/SubFlowImpl.scala | 2 + .../main/scala/akka/stream/javadsl/Flow.scala | 47 +++-------------- .../main/scala/akka/stream/javadsl/Sink.scala | 7 +++ .../scala/akka/stream/javadsl/Source.scala | 47 +++-------------- .../scala/akka/stream/javadsl/SubFlow.scala | 29 +++-------- .../scala/akka/stream/javadsl/SubSource.scala | 27 +++------- .../scala/akka/stream/scaladsl/BidiFlow.scala | 5 +- .../scala/akka/stream/scaladsl/Flow.scala | 51 ++++--------------- .../scala/akka/stream/scaladsl/Graph.scala | 12 ++--- .../scala/akka/stream/scaladsl/Sink.scala | 7 ++- .../scala/akka/stream/scaladsl/Source.scala | 7 ++- 25 files changed, 125 insertions(+), 194 deletions(-) diff --git a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java index 3be2f2bd21..0a5a775dd0 100644 --- a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java @@ -287,8 +287,7 @@ public class FlowDocTest { //#flow-async Source.range(1, 3) - .map(x -> x + 1) - .withAttributes(Attributes.asyncBoundary()) + .map(x -> x + 1).async() .map(x -> x * 2) .to(Sink.ignore()); //#flow-async diff --git a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java index 7ba4fd2516..27a7ced476 100644 --- a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java +++ b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java @@ -5,6 +5,7 @@ package docs.stream; import java.util.stream.Stream; +import akka.NotUsed; import akka.japi.Pair; import akka.stream.javadsl.*; //#asPublisher-import @@ -27,6 +28,11 @@ public class MigrationsJava { Sink.asPublisher(WITH_FANOUT); // instead of Sink.asPublisher(true) Sink.asPublisher(WITHOUT_FANOUT); // instead of Sink.asPublisher(false) //#asPublisher + + //#async + Flow flow = Flow.of(Integer.class).map(n -> n + 1); + Source.range(1, 10).via(flow.async()); + //#async } } \ No newline at end of file diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index 89206a3128..cae5059c07 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -104,6 +104,21 @@ Which is the same as using ``conflateWithSeed`` with an identity function:: Flow.of(Integer.class).conflateWithSeed(x -> x, (a, b) -> a + b) // Add numbers while downstream is not ready + +``viaAsync`` and ``viaAsyncMat`` has been replaced with ``async()`` +------------------------------------------------------------------- +``async()`` is available from ``Sink``, ``Source``, ``Flow`` and the sub flows. It provides a shortcut for +setting the attribute ``Attributes.asyncBoundary`` on a flow. The existing methods ``Flow.viaAsync`` and +``Flow.viaAsyncMat`` has been removed to make marking out asynchronous boundaries more consistent:: + + // This no longer works + source.viaAsync(flow) + +In Akka 2.4.x this will instead look lile this: + +.. includecode:: ../code/docs/stream/MigrationsJava.java#async + + Changed Sources / Sinks ======================= diff --git a/akka-docs/rst/java/stream/stream-flows-and-basics.rst b/akka-docs/rst/java/stream/stream-flows-and-basics.rst index 1243e517e8..13b1c6cc2b 100644 --- a/akka-docs/rst/java/stream/stream-flows-and-basics.rst +++ b/akka-docs/rst/java/stream/stream-flows-and-basics.rst @@ -243,8 +243,9 @@ The first point can be countered by pre-fusing and then reusing a stream bluepri .. includecode:: ../code/docs/stream/FlowDocTest.java#explicit-fusing In order to balance the effects of the second and third bullet points you will have to insert asynchronous -boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that -shall communicate with the rest of the graph in an asynchronous fashion. +boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` using the method +``async`` on ``Source``, ``Sink`` and ``Flow`` to pieces that shall communicate with the rest of the graph in +an asynchronous fashion. .. includecode:: ../code/docs/stream/FlowDocTest.java#flow-async diff --git a/akka-docs/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/FlowDocSpec.scala index c179fef9cd..7407d4d377 100644 --- a/akka-docs/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -237,11 +237,8 @@ class FlowDocSpec extends AkkaSpec { "defining asynchronous boundaries" in { //#flow-async - import akka.stream.Attributes.asyncBoundary - Source(List(1, 2, 3)) - .map(_ + 1) - .withAttributes(asyncBoundary) + .map(_ + 1).async .map(_ * 2) .to(Sink.ignore) //#flow-async diff --git a/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala b/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala index 9490976605..ea0448924d 100644 --- a/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala +++ b/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala @@ -23,6 +23,11 @@ class MigrationsScala extends AkkaSpec { }) }) //#expand-state + + //#async + val flow = Flow[Int].map(_ + 1) + Source(1 to 10).via(flow.async) + //#async } } } diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index 0afd5d2611..bc2def8486 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -91,6 +91,21 @@ Which is the same as using ``conflateWithSeed`` with an identity function Flow[Int].conflateWithSeed(identity)(_ + _) // Add numbers while downstream is not ready + +``viaAsync`` and ``viaAsyncMat`` has been replaced with ``async`` +----------------------------------------------------------------- +``async`` is available from ``Sink``, ``Source``, ``Flow`` and the sub flows. It provides a shortcut for +setting the attribute ``Attributes.asyncBoundary`` on a flow. The existing methods ``Flow.viaAsync`` and +``Flow.viaAsyncMat`` has been removed to make marking out asynchronous boundaries more consistent:: + + // This no longer works + source.viaAsync(flow) + +In Akka 2.4.x this will instead look lile this: + +.. includecode:: ../code/docs/stream/MigrationsScala.scala#async + + Changes in Akka HTTP ==================== diff --git a/akka-docs/rst/scala/stream/stream-flows-and-basics.rst b/akka-docs/rst/scala/stream/stream-flows-and-basics.rst index c9c6864b8c..21a8bc2f20 100644 --- a/akka-docs/rst/scala/stream/stream-flows-and-basics.rst +++ b/akka-docs/rst/scala/stream/stream-flows-and-basics.rst @@ -245,8 +245,9 @@ The first point can be countered by pre-fusing and then reusing a stream bluepri .. includecode:: ../code/docs/stream/FlowDocSpec.scala#explicit-fusing In order to balance the effects of the second and third bullet points you will have to insert asynchronous -boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that -shall communicate with the rest of the graph in an asynchronous fashion. +boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` using the method +``async`` on ``Source``, ``Sink`` and ``Flow`` to pieces that shall communicate with the rest of the graph in an +asynchronous fashion. .. includecode:: ../code/docs/stream/FlowDocSpec.scala#flow-async diff --git a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala index cf41f231b0..999cb951d6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala @@ -95,7 +95,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple "SubFusingActorMaterializer" must { "work with asynchronous boundaries in the subflows" in { - val async = Flow[Int].map(_ * 2).withAttributes(Attributes.asyncBoundary) + val async = Flow[Int].map(_ * 2).async Source(0 to 9) .map(_ * 10) .flatMapMerge(5, i ⇒ Source(i to (i + 9)).via(async)) @@ -110,7 +110,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging] bus.logSource } - val async = Flow[Int].map(x ⇒ { testActor ! ref; x }).withAttributes(Attributes.asyncBoundary) + val async = Flow[Int].map(x ⇒ { testActor ! ref; x }).async Source(0 to 9) .map(x ⇒ { testActor ! ref; x }) .flatMapMerge(5, i ⇒ Source.single(i).via(async)) @@ -132,7 +132,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple val flow = Flow[Int].map(x ⇒ { testActor ! ref; x }) Source(0 to 9) .map(x ⇒ { testActor ! ref; x }) - .flatMapMerge(5, i ⇒ Source.single(i).viaAsync(flow)) + .flatMapMerge(5, i ⇒ Source.single(i).via(flow.async)) .grouped(1000) .runWith(Sink.head) .futureValue diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index c2574810db..2a472d7c11 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -114,7 +114,7 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "suitably override attribute handling methods" in { import Attributes._ - val b: BidiFlow[Int, Long, ByteString, String, NotUsed] = bidi.withAttributes(name("")).addAttributes(asyncBoundary).named("") + val b: BidiFlow[Int, Long, ByteString, String, NotUsed] = bidi.withAttributes(name("")).async.named("") } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 4a5a59649e..343bcae9fb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -596,7 +596,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "suitably override attribute handling methods" in { import Attributes._ - val f: Flow[Int, Int, NotUsed] = Flow[Int].withAttributes(asyncBoundary).addAttributes(none).named("") + val f: Flow[Int, Int, NotUsed] = Flow[Int].async.addAttributes(none).named("") } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index be40b8907a..bcf1cc0ea5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -127,7 +127,7 @@ class SinkSpec extends AkkaSpec with ConversionCheckedTripleEquals with ScalaFut "suitably override attribute handling methods" in { import Attributes._ - val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("") + val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("") } "support contramap" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 57b9484cae..257160b049 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -272,7 +272,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { "A Source" must { "suitably override attribute handling methods" in { import Attributes._ - val s: Source[Int, NotUsed] = Source.single(42).withAttributes(asyncBoundary).addAttributes(none).named("") + val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("") } } diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index e2358df4ad..048b7bcc4c 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -26,5 +26,10 @@ trait Graph[+S <: Shape, +M] { def named(name: String): Graph[S, M] = withAttributes(Attributes.name(name)) + /** + * Put an asynchronous boundary around this `Graph` + */ + def async: Graph[S, M] = addAttributes(Attributes.asyncBoundary) + def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(module.attributes and attr) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala index de40baca43..fcd4f4494a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala @@ -34,6 +34,8 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, NotUsed], override def named(name: String): SubFlow[Out, Mat, F, C] = new SubFlowImpl[In, Out, Mat, F, C](subFlow.named(name), mergeBackFunction, finishFunction) + override def async: Repr[Out] = new SubFlowImpl[In, Out, Mat, F, C](subFlow.async, mergeBackFunction, finishFunction) + override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth) def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f924aac20f..be0887614b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -125,47 +125,6 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.viaMat(flow)(combinerToScala(combine))) - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * `viaMat` if a different strategy is needed. - */ - def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.viaAsync(flow)) - - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The `combine` function is used to compose the materialized values of this flow and that - * flow into the materialized value of the resulting Flow. - */ - def viaAsyncMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = - new Flow(delegate.viaAsyncMat(flow)(combinerToScala(combine))) - /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. * {{{ @@ -1664,6 +1623,12 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends override def named(name: String): javadsl.Flow[In, Out, Mat] = new Flow(delegate.named(name)) + /** + * Put an asynchronous boundary around this `Flow` + */ + override def async: javadsl.Flow[In, Out, Mat] = + new Flow(delegate.async) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 995a6bb77a..fc67955262 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -306,4 +306,11 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink */ override def named(name: String): javadsl.Sink[In, Mat] = new Sink(delegate.named(name)) + + /** + * Put an asynchronous boundary around this `Sink` + */ + override def async: javadsl.Sink[In, Mat] = + new Sink(delegate.async) + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 653c344c46..0339ab8b5f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -376,47 +376,6 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.viaMat(flow)(combinerToScala(combine))) - /** - * Transform this [[Source]] by appending the given processing stages, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Source | - * | | - * | +------+ +------+ | - * | | | | | | - * | | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * `viaMat` if a different strategy is needed. - */ - def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Source[T, Mat] = - new Source(delegate.viaAsync(flow)) - - /** - * Transform this [[Source]] by appending the given processing stages, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Source | - * | | - * | +------+ +------+ | - * | | | | | | - * | | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The `combine` function is used to compose the materialized values of this flow and that - * flow into the materialized value of the resulting Flow. - */ - def viaAsyncMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = - new Source(delegate.viaAsyncMat(flow)(combinerToScala(combine))) - /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. * {{{ @@ -1808,6 +1767,12 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap override def named(name: String): javadsl.Source[Out, Mat] = new Source(delegate.named(name)) + /** + * Put an asynchronous boundary around this `Source` + */ + override def async: javadsl.Source[Out, Mat] = + new Source(delegate.async) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index aa61e8830b..e3eae7985b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -84,29 +84,6 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubFlow[In, T, Mat] = new SubFlow(delegate.via(flow)) - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * [[Flow#viaMat viaMat]] if a different strategy is needed. - */ - def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): SubFlow[In, T, Mat] = - new SubFlow(delegate.viaAsync(flow)) - /** * Connect this [[SubFlow]] to a [[Sink]], concatenating the processing steps of both. * This means that all sub-flows that result from the previous sub-stream operator @@ -1191,6 +1168,12 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def named(name: String): SubFlow[In, Out, Mat] = new SubFlow(delegate.named(name)) + /** + * Put an asynchronous boundary around this `SubFlow` + */ + def async: SubFlow[In, Out, Mat] = + new SubFlow(delegate.async) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 80faf6b12e..5d56882efe 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -82,27 +82,6 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] = new SubSource(delegate.via(flow)) - /** - * Transform this [[SubSource]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Source | - * | | - * | +------+ +------+ | - * | | | | | | - * | | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * [[Flow#viaMat viaMat]] if a different strategy is needed. - */ - def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] = - new SubSource(delegate.viaAsync(flow)) - /** * Connect this [[SubSource]] to a [[Sink]], concatenating the processing steps of both. * This means that all sub-flows that result from the previous sub-stream operator @@ -1188,6 +1167,12 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def named(name: String): SubSource[Out, Mat] = new SubSource(delegate.named(name)) + /** + * Put an asynchronous boundary around this `SubSource` + */ + def async: SubSource[Out, Mat] = + new SubSource(delegate.async) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 46a5e92f53..a0f071cc27 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -152,7 +152,10 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu * Add a ``name`` attribute to this Flow. */ override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = - withAttributes(Attributes.name(name)) + addAttributes(Attributes.name(name)) + + override def async: BidiFlow[I1, O1, I2, O2, Mat] = + addAttributes(Attributes.asyncBoundary) } object BidiFlow { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1ed7bf9590..76bed8a5f7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -214,7 +214,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) */ override def withAttributes(attr: Attributes): Repr[Out] = if (isIdentity) this - else new Flow(module.withAttributes(attr).nest()) + else new Flow(module.withAttributes(attr)) /** * Add the given attributes to this Flow. Further calls to `withAttributes` @@ -227,7 +227,12 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** * Add a ``name`` attribute to this Flow. */ - override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) + override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name)) + + /** + * Put an asynchronous boundary around this `Flow` + */ + override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. The returned tuple contains @@ -390,26 +395,6 @@ trait FlowOps[+Out, +Mat] { */ def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * [[Flow#viaMat viaMat]] if a different strategy is needed. - */ - def viaAsync[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = via(flow.addAttributes(Attributes.asyncBoundary)) - /** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. @@ -1743,6 +1728,8 @@ trait FlowOps[+Out, +Mat] { def named(name: String): Repr[Out] + def async: Repr[Out] + /** INTERNAL API */ private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] = via(SymbolicGraphStage(op)) @@ -1787,26 +1774,6 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { */ def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[T, Mat3] - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The `combine` function is used to compose the materialized values of this flow and that - * flow into the materialized value of the resulting Flow. - */ - def viaAsyncMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[T, Mat3] = - viaMat(flow.addAttributes(Attributes.asyncBoundary))(combine) - /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. * {{{ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 7cb117b495..a59aaf3de0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -1007,14 +1007,14 @@ object GraphDSL extends GraphApply { private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_]) extends PortOps[Out] { - override def withAttributes(attr: Attributes): Repr[Out] = - throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported + override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported + override def named(name: String): Repr[Out] = throw settingAttrNotSupported + override def async: Repr[Out] = throw settingAttrNotSupported - override def addAttributes(attr: Attributes): Repr[Out] = - throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + private def settingAttrNotSupported = + new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") - override def named(name: String): Repr[Out] = - throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 4c6b6db500..396298f44f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -72,7 +72,12 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) /** * Add a ``name`` attribute to this Flow. */ - override def named(name: String): Sink[In, Mat] = withAttributes(Attributes.name(name)) + override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name)) + + /** + * Put an asynchronous boundary around this `Sink` + */ + override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index ceb1fc01c3..802be68ccc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -145,7 +145,12 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) /** * Add a ``name`` attribute to this Flow. */ - override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) + override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name)) + + /** + * Put an asynchronous boundary around this `Source` + */ + override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this)