diff --git a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java index 3be2f2bd21..0a5a775dd0 100644 --- a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java @@ -287,8 +287,7 @@ public class FlowDocTest { //#flow-async Source.range(1, 3) - .map(x -> x + 1) - .withAttributes(Attributes.asyncBoundary()) + .map(x -> x + 1).async() .map(x -> x * 2) .to(Sink.ignore()); //#flow-async diff --git a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java index 7ba4fd2516..27a7ced476 100644 --- a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java +++ b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java @@ -5,6 +5,7 @@ package docs.stream; import java.util.stream.Stream; +import akka.NotUsed; import akka.japi.Pair; import akka.stream.javadsl.*; //#asPublisher-import @@ -27,6 +28,11 @@ public class MigrationsJava { Sink.asPublisher(WITH_FANOUT); // instead of Sink.asPublisher(true) Sink.asPublisher(WITHOUT_FANOUT); // instead of Sink.asPublisher(false) //#asPublisher + + //#async + Flow flow = Flow.of(Integer.class).map(n -> n + 1); + Source.range(1, 10).via(flow.async()); + //#async } } \ No newline at end of file diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index 89206a3128..cae5059c07 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -104,6 +104,21 @@ Which is the same as using ``conflateWithSeed`` with an identity function:: Flow.of(Integer.class).conflateWithSeed(x -> x, (a, b) -> a + b) // Add numbers while downstream is not ready + +``viaAsync`` and ``viaAsyncMat`` has been replaced with ``async()`` +------------------------------------------------------------------- +``async()`` is available from ``Sink``, ``Source``, ``Flow`` and the sub flows. It provides a shortcut for +setting the attribute ``Attributes.asyncBoundary`` on a flow. The existing methods ``Flow.viaAsync`` and +``Flow.viaAsyncMat`` has been removed to make marking out asynchronous boundaries more consistent:: + + // This no longer works + source.viaAsync(flow) + +In Akka 2.4.x this will instead look lile this: + +.. includecode:: ../code/docs/stream/MigrationsJava.java#async + + Changed Sources / Sinks ======================= diff --git a/akka-docs/rst/java/stream/stream-flows-and-basics.rst b/akka-docs/rst/java/stream/stream-flows-and-basics.rst index 1243e517e8..13b1c6cc2b 100644 --- a/akka-docs/rst/java/stream/stream-flows-and-basics.rst +++ b/akka-docs/rst/java/stream/stream-flows-and-basics.rst @@ -243,8 +243,9 @@ The first point can be countered by pre-fusing and then reusing a stream bluepri .. includecode:: ../code/docs/stream/FlowDocTest.java#explicit-fusing In order to balance the effects of the second and third bullet points you will have to insert asynchronous -boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that -shall communicate with the rest of the graph in an asynchronous fashion. +boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` using the method +``async`` on ``Source``, ``Sink`` and ``Flow`` to pieces that shall communicate with the rest of the graph in +an asynchronous fashion. .. includecode:: ../code/docs/stream/FlowDocTest.java#flow-async diff --git a/akka-docs/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/FlowDocSpec.scala index c179fef9cd..7407d4d377 100644 --- a/akka-docs/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -237,11 +237,8 @@ class FlowDocSpec extends AkkaSpec { "defining asynchronous boundaries" in { //#flow-async - import akka.stream.Attributes.asyncBoundary - Source(List(1, 2, 3)) - .map(_ + 1) - .withAttributes(asyncBoundary) + .map(_ + 1).async .map(_ * 2) .to(Sink.ignore) //#flow-async diff --git a/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala b/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala index 9490976605..ea0448924d 100644 --- a/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala +++ b/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala @@ -23,6 +23,11 @@ class MigrationsScala extends AkkaSpec { }) }) //#expand-state + + //#async + val flow = Flow[Int].map(_ + 1) + Source(1 to 10).via(flow.async) + //#async } } } diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index 0afd5d2611..bc2def8486 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -91,6 +91,21 @@ Which is the same as using ``conflateWithSeed`` with an identity function Flow[Int].conflateWithSeed(identity)(_ + _) // Add numbers while downstream is not ready + +``viaAsync`` and ``viaAsyncMat`` has been replaced with ``async`` +----------------------------------------------------------------- +``async`` is available from ``Sink``, ``Source``, ``Flow`` and the sub flows. It provides a shortcut for +setting the attribute ``Attributes.asyncBoundary`` on a flow. The existing methods ``Flow.viaAsync`` and +``Flow.viaAsyncMat`` has been removed to make marking out asynchronous boundaries more consistent:: + + // This no longer works + source.viaAsync(flow) + +In Akka 2.4.x this will instead look lile this: + +.. includecode:: ../code/docs/stream/MigrationsScala.scala#async + + Changes in Akka HTTP ==================== diff --git a/akka-docs/rst/scala/stream/stream-flows-and-basics.rst b/akka-docs/rst/scala/stream/stream-flows-and-basics.rst index c9c6864b8c..21a8bc2f20 100644 --- a/akka-docs/rst/scala/stream/stream-flows-and-basics.rst +++ b/akka-docs/rst/scala/stream/stream-flows-and-basics.rst @@ -245,8 +245,9 @@ The first point can be countered by pre-fusing and then reusing a stream bluepri .. includecode:: ../code/docs/stream/FlowDocSpec.scala#explicit-fusing In order to balance the effects of the second and third bullet points you will have to insert asynchronous -boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that -shall communicate with the rest of the graph in an asynchronous fashion. +boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` using the method +``async`` on ``Source``, ``Sink`` and ``Flow`` to pieces that shall communicate with the rest of the graph in an +asynchronous fashion. .. includecode:: ../code/docs/stream/FlowDocSpec.scala#flow-async diff --git a/akka-stream-tests/src/test/resources/reference.conf b/akka-stream-tests/src/test/resources/reference.conf index ffbc14a271..107a9f2c70 100644 --- a/akka-stream-tests/src/test/resources/reference.conf +++ b/akka-stream-tests/src/test/resources/reference.conf @@ -8,4 +8,5 @@ akka { akka.actor.warn-about-java-serializer-usage = false stream.materializer.debug.fuzzing-mode = on + stream.secret-test-fuzzing-warning-disable = 42 } \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala index cf41f231b0..999cb951d6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala @@ -95,7 +95,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple "SubFusingActorMaterializer" must { "work with asynchronous boundaries in the subflows" in { - val async = Flow[Int].map(_ * 2).withAttributes(Attributes.asyncBoundary) + val async = Flow[Int].map(_ * 2).async Source(0 to 9) .map(_ * 10) .flatMapMerge(5, i ⇒ Source(i to (i + 9)).via(async)) @@ -110,7 +110,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging] bus.logSource } - val async = Flow[Int].map(x ⇒ { testActor ! ref; x }).withAttributes(Attributes.asyncBoundary) + val async = Flow[Int].map(x ⇒ { testActor ! ref; x }).async Source(0 to 9) .map(x ⇒ { testActor ! ref; x }) .flatMapMerge(5, i ⇒ Source.single(i).via(async)) @@ -132,7 +132,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple val flow = Flow[Int].map(x ⇒ { testActor ! ref; x }) Source(0 to 9) .map(x ⇒ { testActor ! ref; x }) - .flatMapMerge(5, i ⇒ Source.single(i).viaAsync(flow)) + .flatMapMerge(5, i ⇒ Source.single(i).via(flow.async)) .grouped(1000) .runWith(Sink.head) .futureValue diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index da05c2e2c4..4e68f3b3cd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -511,7 +511,9 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { downstream.requestOne() lastEvents() should be(Set(RequestOne)) - upstream.onComplete() + EventFilter[IllegalArgumentException](pattern = ".*Cannot pull closed port.*", occurrences = 1).intercept { + upstream.onComplete() + } val ev = lastEvents() ev.nonEmpty should be(true) ev.forall { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index c2574810db..2a472d7c11 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -114,7 +114,7 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { "suitably override attribute handling methods" in { import Attributes._ - val b: BidiFlow[Int, Long, ByteString, String, NotUsed] = bidi.withAttributes(name("")).addAttributes(asyncBoundary).named("") + val b: BidiFlow[Int, Long, ByteString, String, NotUsed] = bidi.withAttributes(name("")).async.named("") } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index 75ade88c1b..ad94d7d3e5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -4,7 +4,6 @@ package akka.stream.scaladsl import akka.NotUsed - import scala.collection.immutable import scala.concurrent.duration._ import akka.stream.ActorMaterializer @@ -14,6 +13,7 @@ import akka.stream.testkit.Utils._ import org.reactivestreams.Subscription import akka.testkit.TestProbe import org.reactivestreams.Subscriber +import akka.testkit.EventFilter class FlowIteratorSpec extends AbstractFlowIteratorSpec { override def testName = "A Flow based on an iterator producing function" @@ -40,7 +40,9 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { sub.request(1) c.expectNext(1) c.expectNoMsg(100.millis) - sub.request(2) + EventFilter[IllegalStateException](message = "not two", occurrences = 1).intercept { + sub.request(2) + } c.expectError().getMessage should be("not two") sub.request(2) c.expectNoMsg(100.millis) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 4a5a59649e..343bcae9fb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -596,7 +596,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "suitably override attribute handling methods" in { import Attributes._ - val f: Flow[Int, Int, NotUsed] = Flow[Int].withAttributes(asyncBoundary).addAttributes(none).named("") + val f: Flow[Int, Int, NotUsed] = Flow[Int].async.addAttributes(none).named("") } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala index 6dd7130d00..b2df91f174 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } import org.reactivestreams.Publisher import scala.concurrent.duration._ +import akka.testkit.EventFilter class FlowZipWithSpec extends BaseTwoStreamsSetup { @@ -46,7 +47,9 @@ class FlowZipWithSpec extends BaseTwoStreamsSetup { probe.expectNext(1 / -2) probe.expectNext(2 / -1) - subscription.request(2) + EventFilter[ArithmeticException](occurrences = 1).intercept { + subscription.request(2) + } probe.expectError() match { case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala index bca40a7173..1c778547a4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala @@ -8,9 +8,9 @@ import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.Utils._ import akka.stream.testkit._ import org.reactivestreams.Publisher - import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import akka.testkit.EventFilter class GraphUnzipWithSpec extends AkkaSpec { @@ -174,7 +174,9 @@ class GraphUnzipWithSpec extends AkkaSpec { leftProbe.expectNext(1 / -1) rightProbe.expectNext("1/-1") - requestFromBoth() + EventFilter[ArithmeticException](occurrences = 1).intercept { + requestFromBoth() + } leftProbe.expectError() match { case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala index af77a0f652..bda0179180 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala @@ -3,6 +3,7 @@ package akka.stream.scaladsl import akka.stream.testkit._ import scala.concurrent.duration._ import akka.stream._ +import akka.testkit.EventFilter class GraphZipWithSpec extends TwoStreamsSetup { import GraphDSL.Implicits._ @@ -65,7 +66,9 @@ class GraphZipWithSpec extends TwoStreamsSetup { probe.expectNext(1 / -2) probe.expectNext(2 / -1) - subscription.request(2) + EventFilter[ArithmeticException](occurrences = 1).intercept { + subscription.request(2) + } probe.expectError() match { case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero") } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index be40b8907a..bcf1cc0ea5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -127,7 +127,7 @@ class SinkSpec extends AkkaSpec with ConversionCheckedTripleEquals with ScalaFut "suitably override attribute handling methods" in { import Attributes._ - val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("") + val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("") } "support contramap" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 57b9484cae..0655f1d248 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -13,6 +13,7 @@ import scala.util.control.NoStackTrace import akka.stream._ import akka.stream.testkit._ import akka.NotUsed +import akka.testkit.EventFilter class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { @@ -235,13 +236,14 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { "terminate with a failure if there is an exception thrown" in { val t = new RuntimeException("expected") - whenReady( - Source.unfold((0, 1)) { - case (a, _) if a > 10000000 ⇒ throw t - case (a, b) ⇒ Some((b, a + b) → a) - }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }.failed) { - _ should be theSameInstanceAs (t) - } + EventFilter[RuntimeException](message = "expected", occurrences = 1) intercept + whenReady( + Source.unfold((0, 1)) { + case (a, _) if a > 10000000 ⇒ throw t + case (a, b) ⇒ Some((b, a + b) → a) + }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }.failed) { + _ should be theSameInstanceAs (t) + } } "generate a finite fibonacci sequence asynchronously" in { @@ -272,7 +274,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { "A Source" must { "suitably override attribute handling methods" in { import Attributes._ - val s: Source[Int, NotUsed] = Source.single(42).withAttributes(asyncBoundary).addAttributes(none).named("") + val s: Source[Int, NotUsed] = Source.single(42).async.addAttributes(none).named("") } } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index 6af4c5ea84..c7aa13a5b3 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -15,7 +15,7 @@ trait GraphApply { def create[S <: Shape]()(buildBlock: GraphDSL.Builder[NotUsed] ⇒ S): Graph[S, NotUsed] = { val builder = new GraphDSL.Builder val s = buildBlock(builder) - val mod = builder.module.nest().replaceShape(s) + val mod = builder.module.replaceShape(s) new GraphApply.GraphImpl(s, mod) } @@ -28,7 +28,7 @@ trait GraphApply { val builder = new GraphDSL.Builder val s1 = builder.add(g1) val s = buildBlock(builder)(s1) - val mod = builder.module.nest().replaceShape(s) + val mod = builder.module.replaceShape(s) new GraphApply.GraphImpl(s, mod) } @@ -47,7 +47,7 @@ trait GraphApply { [2..#val s1 = builder.add(g1, (f: M1 ⇒ Any, m1: M1) ⇒ f(m1))# ] val s = buildBlock(builder)([#s1#]) - val mod = builder.module.nest().replaceShape(s) + val mod = builder.module.replaceShape(s) new GraphApply.GraphImpl(s, mod) }# @@ -63,7 +63,7 @@ private[stream] object GraphApply { extends Graph[S, Mat] { override def withAttributes(attr: Attributes): Graph[S, Mat] = - new GraphImpl(shape, module.withAttributes(attr).nest()) + new GraphImpl(shape, module.withAttributes(attr)) override def named(name: String): Graph[S, Mat] = withAttributes(Attributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index e2358df4ad..048b7bcc4c 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -26,5 +26,10 @@ trait Graph[+S <: Shape, +M] { def named(name: String): Graph[S, M] = withAttributes(Attributes.name(name)) + /** + * Put an asynchronous boundary around this `Graph` + */ + def async: Graph[S, M] = addAttributes(Attributes.asyncBoundary) + def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(module.attributes and attr) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index db581e1860..2d6ee38d94 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -36,7 +36,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, private val _logger = Logging.getLogger(system, this) override def logger = _logger - if (settings.fuzzingMode) { + if (settings.fuzzingMode && !system.settings.config.hasPath("akka.stream.secret-test-fuzzing-warning-disable")) { _logger.warning("Fuzzing mode is enabled on this system. If you see this warning on your production system then " + "set akka.stream.materializer.debug.fuzzing-mode to off.") } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 3c52cd29ee..4e4bf7393a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -136,6 +136,10 @@ object StreamLayout { /** * Verify that the given Shape has the same ports and return a new module with that shape. * Concrete implementations may throw UnsupportedOperationException where applicable. + * + * Please note that this method MUST NOT be implemented using a CopiedModule since + * the purpose of replaceShape can also be to rearrange the ports (as in BidiFlow.reversed) + * and that purpose would be defeated. */ def replaceShape(s: Shape): Module @@ -199,7 +203,7 @@ object StreamLayout { downstreams.updated(from, to), upstreams.updated(to, from), materializedValueComputation, - attributes) + if (isSealed) Attributes.none else attributes) } final def transformMaterializedValue(f: Any ⇒ Any): Module = { @@ -289,39 +293,20 @@ object StreamLayout { Attributes.none) } - /** - * Creates a new Module which contains `this` Module - * @return a new Module - */ - def nest(): Module = { - if (Debug) validate(this) - - CompositeModule( - Set(this), - shape, - /* - * Composite modules always maintain the flattened upstreams/downstreams map (i.e. they contain all the - * layout information of all the nested modules). Copied modules break the nesting, scoping them to the - * copied module. The MaterializerSession will take care of propagating the necessary Publishers and Subscribers - * from the enclosed scope to the outer scope. - */ - downstreams, - upstreams, - /* - * Wrapping like this shields the outer module from the details of the - * materialized value computation of its submodules. - */ - Atomic(this), - Attributes.none) - } - def subModules: Set[Module] - final def isSealed: Boolean = isAtomic || isCopied || isFused + final def isSealed: Boolean = isAtomic || isCopied || isFused || attributes.attributeList.nonEmpty def downstreams: Map[OutPort, InPort] = Map.empty def upstreams: Map[InPort, OutPort] = Map.empty def materializedValueComputation: MaterializedValueNode = Atomic(this) + + /** + * The purpose of this method is to create a copy to be included in a larger + * graph such that port identity clashes are avoided. Where a full copy is not + * possible or desirable, use a CopiedModule. The shape of the resulting + * module MUST NOT contain the same ports as this module’s shape. + */ def carbonCopy: Module def attributes: Attributes @@ -342,8 +327,6 @@ object StreamLayout { override def compose[A, B, C](that: Module, f: (A, B) ⇒ C): Module = throw new UnsupportedOperationException("It is invalid to combine materialized value with EmptyModule") - override def nest(): Module = this - override def subModules: Set[Module] = Set.empty override def withAttributes(attributes: Attributes): Module = @@ -368,7 +351,7 @@ object StreamLayout { override def replaceShape(s: Shape): Module = { shape.requireSamePortsAs(s) - copy(shape = s) + CompositeModule(this, s) } override val materializedValueComputation: MaterializedValueNode = Atomic(copyOf) @@ -379,12 +362,12 @@ object StreamLayout { } final case class CompositeModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes) extends Module { override def replaceShape(s: Shape): Module = { shape.requireSamePortsAs(s) @@ -404,14 +387,18 @@ object StreamLayout { |""".stripMargin } + object CompositeModule { + def apply(m: Module, s: Shape): CompositeModule = CompositeModule(Set(m), s, Map.empty, Map.empty, Atomic(m), Attributes.none) + } + final case class FusedModule( - override val subModules: Set[Module], - override val shape: Shape, - override val downstreams: Map[OutPort, InPort], - override val upstreams: Map[InPort, OutPort], - override val materializedValueComputation: MaterializedValueNode, - override val attributes: Attributes, - info: Fusing.StructuralInfo) extends Module { + override val subModules: Set[Module], + override val shape: Shape, + override val downstreams: Map[OutPort, InPort], + override val upstreams: Map[InPort, OutPort], + override val materializedValueComputation: MaterializedValueNode, + override val attributes: Attributes, + info: Fusing.StructuralInfo) extends Module { override def isFused: Boolean = true diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala index de40baca43..fcd4f4494a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala @@ -34,6 +34,8 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, NotUsed], override def named(name: String): SubFlow[Out, Mat, F, C] = new SubFlowImpl[In, Out, Mat, F, C](subFlow.named(name), mergeBackFunction, finishFunction) + override def async: Repr[Out] = new SubFlowImpl[In, Out, Mat, F, C](subFlow.async, mergeBackFunction, finishFunction) + override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth) def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index a41f8c8fd8..1f82637e73 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -8,7 +8,7 @@ import akka.actor._ import akka.event.Logging import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance._ -import akka.stream.impl.StreamLayout.{ CopiedModule, Module } +import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module } import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly } import akka.stream.impl.{ ActorPublisher, ReactiveStreamsCompliance } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } @@ -29,13 +29,9 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at override def subModules: Set[Module] = Set.empty override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr) - override final def carbonCopy: Module = { - val newShape = shape.deepCopy() - replaceShape(newShape) - } + override final def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) - override final def replaceShape(newShape: Shape): Module = - CopiedModule(newShape, attributes, copyOf = this) + override final def replaceShape(newShape: Shape): Module = CompositeModule(this, newShape) override def toString: String = s"GraphModule\n ${assembly.toString.replace("\n", "\n ")}\n shape=$shape, attributes=$attributes" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala index 9af78b8837..14f81c057d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala @@ -246,7 +246,7 @@ private[stream] object Fusing { struct: BuildStructuralInfo, openGroup: ju.Set[Module], indent: Int): List[(Module, MaterializedValueNode)] = { - def log(msg: String): Unit = println(indent + msg) + def log(msg: String): Unit = println(" " * indent + msg) val async = m match { case _: GraphStageModule ⇒ m.attributes.contains(AsyncBoundary) case _: GraphModule ⇒ m.attributes.contains(AsyncBoundary) @@ -275,7 +275,7 @@ private[stream] object Fusing { * - we need to register the contained modules but take care to not include the internal * wirings into the final result, see also `struct.removeInternalWires()` */ - if (Debug) log(s"graph module ${m.toString.replace("\n", "\n" + indent)}") + if (Debug) log(s"graph module ${m.toString.replace("\n", "\n" + " " * indent)}") // storing the old Shape in arrays for in-place updating as we clone the contained GraphStages val oldIns = oldShape.inlets.toArray @@ -356,7 +356,7 @@ private[stream] object Fusing { subMatBuilder ++= res } val subMat = subMatBuilder.result() - if (Debug) log(subMat.map(p ⇒ s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + indent, "\n " + indent, "")) + if (Debug) log(subMat.map(p ⇒ s"${p._1.getClass.getName}[${p._1.hashCode}] -> ${p._2}").mkString("subMat\n " + " " * indent, "\n " + " " * indent, "")) // we need to remove all wirings that this module copied from nested modules so that we // don’t do wirings twice val oldDownstreams = m match { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 4ccc35bf5b..c51f059e2a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -25,10 +25,9 @@ import scala.util.Try private[akka] final case class GraphStageModule(shape: Shape, attributes: Attributes, stage: GraphStageWithMaterializedValue[Shape, Any]) extends Module { - def carbonCopy: Module = replaceShape(shape.deepCopy()) + def carbonCopy: Module = CopiedModule(shape.deepCopy(), Attributes.none, this) - def replaceShape(s: Shape): Module = - CopiedModule(s, Attributes.none, this) + def replaceShape(s: Shape): Module = CompositeModule(this, s) def subModules: Set[Module] = Set.empty diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index 913bdbae91..cc8c5e4488 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -9,7 +9,7 @@ import java.util.Optional import akka.{ NotUsed, japi } import akka.stream._ -import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.StreamLayout.{ Module, CompositeModule } import akka.util.ByteString import javax.net.ssl._ @@ -128,7 +128,7 @@ object SslTls { override def replaceShape(s: Shape) = if (s == shape) this - else if (shape.hasSamePortsAs(s)) copy(shape = s) + else if (shape.hasSamePortsAs(s)) CompositeModule(this, s) else throw new IllegalArgumentException("trying to replace shape with different ports") } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index bc7223de86..d3e92b1cae 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -126,47 +126,6 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.viaMat(flow)(combinerToScala(combine))) - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * `viaMat` if a different strategy is needed. - */ - def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.viaAsync(flow)) - - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The `combine` function is used to compose the materialized values of this flow and that - * flow into the materialized value of the resulting Flow. - */ - def viaAsyncMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = - new Flow(delegate.viaAsyncMat(flow)(combinerToScala(combine))) - /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. * {{{ @@ -1685,6 +1644,12 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends override def named(name: String): javadsl.Flow[In, Out, Mat] = new Flow(delegate.named(name)) + /** + * Put an asynchronous boundary around this `Flow` + */ + override def async: javadsl.Flow[In, Out, Mat] = + new Flow(delegate.async) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 995a6bb77a..fc67955262 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -306,4 +306,11 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink */ override def named(name: String): javadsl.Sink[In, Mat] = new Sink(delegate.named(name)) + + /** + * Put an asynchronous boundary around this `Sink` + */ + override def async: javadsl.Sink[In, Mat] = + new Sink(delegate.async) + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 974d1ea494..c45fa5b216 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -376,47 +376,6 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.viaMat(flow)(combinerToScala(combine))) - /** - * Transform this [[Source]] by appending the given processing stages, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Source | - * | | - * | +------+ +------+ | - * | | | | | | - * | | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * `viaMat` if a different strategy is needed. - */ - def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Source[T, Mat] = - new Source(delegate.viaAsync(flow)) - - /** - * Transform this [[Source]] by appending the given processing stages, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Source | - * | | - * | +------+ +------+ | - * | | | | | | - * | | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The `combine` function is used to compose the materialized values of this flow and that - * flow into the materialized value of the resulting Flow. - */ - def viaAsyncMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = - new Source(delegate.viaAsyncMat(flow)(combinerToScala(combine))) - /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. * {{{ @@ -1828,6 +1787,12 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap override def named(name: String): javadsl.Source[Out, Mat] = new Source(delegate.named(name)) + /** + * Put an asynchronous boundary around this `Source` + */ + override def async: javadsl.Source[Out, Mat] = + new Source(delegate.async) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 5176084a10..2d80a6292d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -84,29 +84,6 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubFlow[In, T, Mat] = new SubFlow(delegate.via(flow)) - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * [[Flow#viaMat viaMat]] if a different strategy is needed. - */ - def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): SubFlow[In, T, Mat] = - new SubFlow(delegate.viaAsync(flow)) - /** * Connect this [[SubFlow]] to a [[Sink]], concatenating the processing steps of both. * This means that all sub-flows that result from the previous sub-stream operator @@ -1211,6 +1188,12 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def named(name: String): SubFlow[In, Out, Mat] = new SubFlow(delegate.named(name)) + /** + * Put an asynchronous boundary around this `SubFlow` + */ + def async: SubFlow[In, Out, Mat] = + new SubFlow(delegate.async) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index e804341e6f..673cdfe493 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -82,27 +82,6 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] = new SubSource(delegate.via(flow)) - /** - * Transform this [[SubSource]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Source | - * | | - * | +------+ +------+ | - * | | | | | | - * | | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * [[Flow#viaMat viaMat]] if a different strategy is needed. - */ - def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] = - new SubSource(delegate.viaAsync(flow)) - /** * Connect this [[SubSource]] to a [[Sink]], concatenating the processing steps of both. * This means that all sub-flows that result from the previous sub-stream operator @@ -1208,6 +1187,12 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def named(name: String): SubSource[Out, Mat] = new SubSource(delegate.named(name)) + /** + * Put an asynchronous boundary around this `SubSource` + */ + def async: SubSource[Out, Mat] = + new SubSource(delegate.async) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 46a5e92f53..1ab202d56a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -116,12 +116,7 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu /** * Turn this BidiFlow around by 180 degrees, logically flipping it upside down in a protocol stack. */ - def reversed: BidiFlow[I2, O2, I1, O1, Mat] = { - BidiFlow.fromGraph(GraphDSL.create(this) { implicit b ⇒ - reversed ⇒ - BidiShape(reversed.in2, reversed.out2, reversed.in1, reversed.out1) - }) - } + def reversed: BidiFlow[I2, O2, I1, O1, Mat] = new BidiFlow(module.replaceShape(BidiShape(shape.in2, shape.out2, shape.in1, shape.out1))) /** * Transform only the materialized value of this BidiFlow, leaving all other properties as they were. @@ -137,7 +132,7 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu * only to the contained processing stages). */ override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = - new BidiFlow(module.withAttributes(attr).nest()) + new BidiFlow(module.withAttributes(attr)) /** * Add the given attributes to this Source. Further calls to `withAttributes` @@ -152,7 +147,10 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu * Add a ``name`` attribute to this Flow. */ override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = - withAttributes(Attributes.name(name)) + addAttributes(Attributes.name(name)) + + override def async: BidiFlow[I1, O1, I2, O2, Mat] = + addAttributes(Attributes.asyncBoundary) } object BidiFlow { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index d921814db1..5099a85440 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -28,7 +28,7 @@ import akka.NotUsed * A `Flow` is a set of stream processing steps that has one open input and one open output. */ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) - extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat] { + extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat] { override val shape: FlowShape[In, Out] = module.shape.asInstanceOf[FlowShape[In, Out]] @@ -214,7 +214,7 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) */ override def withAttributes(attr: Attributes): Repr[Out] = if (isIdentity) this - else new Flow(module.withAttributes(attr).nest()) + else new Flow(module.withAttributes(attr)) /** * Add the given attributes to this Flow. Further calls to `withAttributes` @@ -227,7 +227,12 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) /** * Add a ``name`` attribute to this Flow. */ - override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) + override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name)) + + /** + * Put an asynchronous boundary around this `Flow` + */ + override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. The returned tuple contains @@ -309,12 +314,12 @@ object Flow { fromSinkAndSourceMat(sink, source)(Keep.none) /** - * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input - * will be sent to the Sink and the Flow's output will come from the Source. - * - * The `combine` function is used to compose the materialized values of the `sink` and `source` - * into the materialized value of the resulting [[Flow]]. - */ + * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input + * will be sent to the Sink and the Flow's output will come from the Source. + * + * The `combine` function is used to compose the materialized values of the `sink` and `source` + * into the materialized value of the resulting [[Flow]]. + */ def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] = fromGraph(GraphDSL.create(sink, source)(combine) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) }) } @@ -349,7 +354,7 @@ final case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Mo def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) override def withAttributes(attr: Attributes): RunnableGraph[Mat] = - new RunnableGraph(module.withAttributes(attr).nest()) + new RunnableGraph(module.withAttributes(attr)) override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name)) } @@ -390,26 +395,6 @@ trait FlowOps[+Out, +Mat] { */ def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The materialized value of the combined [[Flow]] will be the materialized - * value of the current flow (ignoring the other Flow’s value), use - * [[Flow#viaMat viaMat]] if a different strategy is needed. - */ - def viaAsync[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = via(flow.addAttributes(Attributes.asyncBoundary)) - /** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. @@ -963,8 +948,6 @@ trait FlowOps[+Out, +Mat] { def conflateWithSeed[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate)) - - /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the @@ -1763,6 +1746,8 @@ trait FlowOps[+Out, +Mat] { def named(name: String): Repr[Out] + def async: Repr[Out] + /** INTERNAL API */ private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] = via(SymbolicGraphStage(op)) @@ -1807,26 +1792,6 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { */ def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[T, Mat3] - /** - * Transform this [[Flow]] by appending the given processing steps, ensuring - * that an `asyncBoundary` attribute is set around those steps. - * {{{ - * +----------------------------+ - * | Resulting Flow | - * | | - * | +------+ +------+ | - * | | | | | | - * In ~~> | this | ~Out~> | flow | ~~> T - * | | | | | | - * | +------+ +------+ | - * +----------------------------+ - * }}} - * The `combine` function is used to compose the materialized values of this flow and that - * flow into the materialized value of the resulting Flow. - */ - def viaAsyncMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[T, Mat3] = - viaMat(flow.addAttributes(Attributes.asyncBoundary))(combine) - /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. * {{{ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index c612b101a8..01b61eb14e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -1011,14 +1011,14 @@ object GraphDSL extends GraphApply { private class PortOpsImpl[+Out](override val outlet: Outlet[Out @uncheckedVariance], b: Builder[_]) extends PortOps[Out] { - override def withAttributes(attr: Attributes): Repr[Out] = - throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + override def withAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported + override def addAttributes(attr: Attributes): Repr[Out] = throw settingAttrNotSupported + override def named(name: String): Repr[Out] = throw settingAttrNotSupported + override def async: Repr[Out] = throw settingAttrNotSupported - override def addAttributes(attr: Attributes): Repr[Out] = - throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + private def settingAttrNotSupported = + new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") - override def named(name: String): Repr[Out] = - throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 4c6b6db500..691a4e4469 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -58,7 +58,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Sink[In, Mat] = - new Sink(module.withAttributes(attr).nest()) + new Sink(module.withAttributes(attr)) /** * Add the given attributes to this Source. Further calls to `withAttributes` @@ -72,7 +72,12 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) /** * Add a ``name`` attribute to this Flow. */ - override def named(name: String): Sink[In, Mat] = withAttributes(Attributes.name(name)) + override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name)) + + /** + * Put an asynchronous boundary around this `Sink` + */ + override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index ceb1fc01c3..d65ee7bbde 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -132,7 +132,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Repr[Out] = - new Source(module.withAttributes(attr).nest()) + new Source(module.withAttributes(attr)) /** * Add the given attributes to this Source. Further calls to `withAttributes` @@ -145,7 +145,12 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) /** * Add a ``name`` attribute to this Flow. */ - override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) + override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name)) + + /** + * Put an asynchronous boundary around this `Source` + */ + override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this)