diff --git a/akka-docs-dev/rst/images/asyncBoundary.png b/akka-docs-dev/rst/images/asyncBoundary.png new file mode 100644 index 0000000000..337283728d Binary files /dev/null and b/akka-docs-dev/rst/images/asyncBoundary.png differ diff --git a/akka-docs-dev/rst/java/stream-customize.rst b/akka-docs-dev/rst/java/stream-customize.rst index 4ddd97e949..b5cb0a57c0 100644 --- a/akka-docs-dev/rst/java/stream-customize.rst +++ b/akka-docs-dev/rst/java/stream-customize.rst @@ -414,7 +414,7 @@ initialization. The buffer has demand for up to two elements without any downstr The following code example demonstrates a buffer class corresponding to the message sequence chart above. -.. includecode:: code/docs/stream/GraphStageDocSpec.scala#detached +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#detached Thread safety of custom processing stages ========================================= diff --git a/akka-docs-dev/rst/java/stream-flows-and-basics.rst b/akka-docs-dev/rst/java/stream-flows-and-basics.rst index 1dc89ef952..941112663e 100644 --- a/akka-docs-dev/rst/java/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/java/stream-flows-and-basics.rst @@ -49,7 +49,8 @@ can hand it back for further use to an underlying thread-pool. .. _defining-and-running-streams-java: Defining and running streams ----------------------------- +============================ + Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: Source @@ -110,7 +111,7 @@ to refer to the future: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#stream-reuse Defining sources, sinks and flows -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +--------------------------------- The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following examples show some of the most useful constructs (refer to the API documentation for more details): @@ -122,7 +123,8 @@ There are various ways to wire up different parts of a stream, the following exa .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-connecting Illegal stream elements -^^^^^^^^^^^^^^^^^^^^^^^ +----------------------- + In accordance to the Reactive Streams specification (`Rule 2.13 `_) Akka Streams do not allow ``null`` to be passed through the stream as an element. In case you want to model the concept of absence of a value we recommend using ``akka.japi.Option`` (for Java 6 and 7) or ``java.util.Optional`` which is available since Java 8. @@ -130,7 +132,8 @@ of absence of a value we recommend using ``akka.japi.Option`` (for Java 6 and 7) .. _back-pressure-explained-java: Back-pressure explained ------------------------ +======================= + Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_ specification, which Akka is a founding member of. @@ -164,7 +167,8 @@ with the upstream production rate or not. To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them: Slow Publisher, fast Subscriber -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------- + This is the happy case of course – we do not need to slow down the Publisher in this case. However signalling rates are rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled @@ -180,7 +184,8 @@ As we can see, in this scenario we effectively operate in so called push-mode si elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. Fast Publisher, slow Subscriber -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------- + This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with the rate at which its upstream would like to emit data elements. @@ -198,7 +203,7 @@ this mode of operation is referred to as pull-based back-pressure. .. _stream-materialization-java: Stream Materialization ----------------------- +====================== When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources @@ -220,8 +225,62 @@ which will be running on the thread pools they have been configured to run on - .. _flow-combine-mat-java: +Operator Fusion +--------------- + +Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that +the processing steps of a flow or stream graph can be executed within the same Actor and has three +consequences: + + * starting up a stream may take longer than before due to executing the fusion algorithm + * passing elements from one processing stage to the next is a lot faster between fused + stages due to avoiding the asynchronous messaging overhead + * fused stream processing stages do no longer run in parallel to each other, meaning that + only up to one CPU core is used for each fused part + +The first point can be countered by pre-fusing and then reusing a stream blueprint as sketched below: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#explicit-fusing + +In order to balance the effects of the second and third bullet points you will have to insert asynchronous +boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that +shall communicate with the rest of the graph in an asynchronous fashion. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-async + +In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding +and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can +work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a +flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work +by adding information to the flow graph that has been constructed up to this point: + +| + +.. image:: ../images/asyncBoundary.png + :align: center + :width: 700 + +| + +This means that everything that is inside the red bubble will be executed by one actor and everything outside of it +by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all +processing stages that have been added since them. + +.. warning:: + + Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer + that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers + may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer + there, data elements are passed without buffering between fused stages. In those cases where buffering + is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the + ``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function. + +The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``. +In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the +:class:`SslTlsStage` and the ``groupBy`` operator all built-in processing stages can be fused. + Combining materialized values -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +----------------------------- Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary to somehow express how these values should be composed to a final value when we plug these stages together. For this, diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 9ba042651c..8e0a3ea344 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -220,4 +220,29 @@ class FlowDocSpec extends AkkaSpec { //#flow-mat-combine } + + "explicit fusing" in { + //#explicit-fusing + import akka.stream.Fusing + + val flow = Flow[Int].map(_ * 2).filter(_ > 500) + val fused = Fusing.aggressive(flow) + + Source.fromIterator { () => Iterator from 0 } + .via(fused) + .take(1000) + //#explicit-fusing + } + + "defining asynchronous boundaries" in { + //#flow-async + import akka.stream.Attributes.asyncBoundary + + Source(List(1, 2, 3)) + .map(_ + 1) + .withAttributes(asyncBoundary) + .map(_ * 2) + .to(Sink.ignore) + //#flow-async + } } diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index acb765107e..285618b5c3 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -49,7 +49,8 @@ can hand it back for further use to an underlying thread-pool. .. _defining-and-running-streams-scala: Defining and running streams ----------------------------- +============================ + Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: Source @@ -114,7 +115,7 @@ to refer to the future: .. includecode:: code/docs/stream/FlowDocSpec.scala#stream-reuse Defining sources, sinks and flows -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +--------------------------------- The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following examples show some of the most useful constructs (refer to the API documentation for more details): @@ -126,7 +127,8 @@ There are various ways to wire up different parts of a stream, the following exa .. includecode:: code/docs/stream/FlowDocSpec.scala#flow-connecting Illegal stream elements -^^^^^^^^^^^^^^^^^^^^^^^ +----------------------- + In accordance to the Reactive Streams specification (`Rule 2.13 `_) Akka Streams do not allow ``null`` to be passed through the stream as an element. In case you want to model the concept of absence of a value we recommend using ``scala.Option`` or ``scala.util.Either``. @@ -134,7 +136,8 @@ of absence of a value we recommend using ``scala.Option`` or ``scala.util.Either .. _back-pressure-explained-scala: Back-pressure explained ------------------------ +======================= + Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_ specification, which Akka is a founding member of. @@ -168,7 +171,8 @@ with the upstream production rate or not. To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them: Slow Publisher, fast Subscriber -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------- + This is the happy case of course – we do not need to slow down the Publisher in this case. However signalling rates are rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled @@ -184,7 +188,8 @@ As we can see, in this scenario we effectively operate in so called push-mode si elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. Fast Publisher, slow Subscriber -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------- + This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with the rate at which its upstream would like to emit data elements. @@ -202,7 +207,7 @@ this mode of operation is referred to as pull-based back-pressure. .. _stream-materialization-scala: Stream Materialization ----------------------- +====================== When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources @@ -222,10 +227,64 @@ which will be running on the thread pools they have been configured to run on - Reusing *instances* of linear computation stages (Source, Sink, Flow) inside composite Graphs is legal, yet will materialize that stage multiple times. +Operator Fusion +--------------- + +Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that +the processing steps of a flow or stream graph can be executed within the same Actor and has three +consequences: + + * starting up a stream may take longer than before due to executing the fusion algorithm + * passing elements from one processing stage to the next is a lot faster between fused + stages due to avoiding the asynchronous messaging overhead + * fused stream processing stages do no longer run in parallel to each other, meaning that + only up to one CPU core is used for each fused part + +The first point can be countered by pre-fusing and then reusing a stream blueprint as sketched below: + +.. includecode:: code/docs/stream/FlowDocSpec.scala#explicit-fusing + +In order to balance the effects of the second and third bullet points you will have to insert asynchronous +boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that +shall communicate with the rest of the graph in an asynchronous fashion. + +.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-async + +In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding +and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can +work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a +flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work +by adding information to the flow graph that has been constructed up to this point: + +| + +.. image:: ../images/asyncBoundary.png + :align: center + :width: 700 + +| + +This means that everything that is inside the red bubble will be executed by one actor and everything outside of it +by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all +processing stages that have been added since them. + +.. warning:: + + Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer + that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers + may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer + there, data elements are passed without buffering between fused stages. In those cases where buffering + is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the + ``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function. + +The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``. +In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the +:class:`SslTlsStage` and the ``groupBy`` operator all built-in processing stages can be fused. + .. _flow-combine-mat-scala: Combining materialized values -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +----------------------------- Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary to somehow express how these values should be composed to a final value when we plug these stages together. For this, diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index 3b15ce7b36..d20f398a34 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -272,4 +272,10 @@ public class BidiFlowTest extends StreamTest { Arrays.sort(rr); assertArrayEquals(new Long[] { 3L, 12L }, rr); } + + public void mustSuitablyOverrideAttributeHandlingMethods() { + @SuppressWarnings("unused") + final BidiFlow b = + bidi.withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 61c7eef48a..bc1d87864e 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -785,4 +785,9 @@ public class FlowTest extends StreamTest { assertEquals((Object) 0, result); } + public void mustSuitablyOverrideAttributeHandlingMethods() { + @SuppressWarnings("unused") + final Flow f = + Flow.of(Integer.class).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index b0bbb22a13..80b688a549 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -13,16 +13,13 @@ import java.util.concurrent.TimeUnit; import akka.actor.ActorRef; import akka.japi.function.Function; import akka.japi.function.Procedure; -import akka.stream.Graph; -import akka.stream.UniformFanInShape; -import akka.stream.UniformFanOutShape; +import akka.stream.*; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import akka.stream.StreamTest; import akka.japi.function.Function2; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; @@ -101,4 +98,9 @@ public class SinkTest extends StreamTest { probe2.expectMsgEquals("done2"); } + public void mustSuitablyOverrideAttributeHandlingMethods() { + @SuppressWarnings("unused") + final Sink> s = + Sink. head().withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 82543d17d9..cca8422953 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -11,10 +11,7 @@ import akka.dispatch.OnSuccess; import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.japi.function.*; -import akka.stream.Graph; -import akka.stream.OverflowStrategy; -import akka.stream.StreamTest; -import akka.stream.UniformFanInShape; +import akka.stream.*; import akka.stream.impl.ConstantFun; import akka.stream.stage.*; import akka.stream.testkit.AkkaSpec; @@ -776,4 +773,9 @@ public class SourceTest extends StreamTest { assertEquals((Object) 0, result); } + public void mustSuitablyOverrideAttributeHandlingMethods() { + @SuppressWarnings("unused") + final Source f = + Source.single(42).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named(""); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala index 84f4abd3d3..a997dab019 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala @@ -105,7 +105,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple .sorted should ===(0 to 198 by 2) } - "use multiple actors when there are asynchronous boundaries in the subflows" in { + "use multiple actors when there are asynchronous boundaries in the subflows (manual)" in { def ref = { val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging] bus.logSource @@ -120,7 +120,26 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple .sorted should ===(0 to 9) val refs = receiveN(20) withClue(s"refs=\n${refs.mkString("\n")}") { - refs.toSet.size should ===(11) + refs.toSet.size should ===(11) // main flow + 10 subflows + } + } + + "use multiple actors when there are asynchronous boundaries in the subflows (combinator)" in { + def ref = { + val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging] + bus.logSource + } + val flow = Flow[Int].map(x ⇒ { testActor ! ref; x }) + Source(0 to 9) + .map(x ⇒ { testActor ! ref; x }) + .flatMapMerge(5, i ⇒ Source.single(i).viaAsync(flow)) + .grouped(1000) + .runWith(Sink.head) + .futureValue + .sorted should ===(0 to 9) + val refs = receiveN(20) + withClue(s"refs=\n${refs.mkString("\n")}") { + refs.toSet.size should ===(11) // main flow + 10 subflows } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index b05d906a0f..7e87519ca6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -111,6 +111,11 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { Await.result(r, 1.second).toSet should ===(Set(3L, 12L)) } + "suitably override attribute handling methods" in { + import Attributes._ + val b: BidiFlow[Int, Long, ByteString, String, Unit] = bidi.withAttributes(name("")).addAttributes(asyncBoundary).named("") + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 8caa8e8201..a93f305c04 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -586,6 +586,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } } } + + "suitably override attribute handling methods" in { + import Attributes._ + val f: Flow[Int, Int, Unit] = Flow[Int].withAttributes(asyncBoundary).addAttributes(none).named("") + } } object TestException extends RuntimeException with NoStackTrace diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 708c6ee431..aaccae0480 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -3,9 +3,10 @@ */ package akka.stream.scaladsl -import akka.stream.{ SinkShape, ActorMaterializer } +import akka.stream._ import akka.stream.testkit.TestPublisher.ManualProbe import akka.stream.testkit._ +import scala.concurrent.Future class SinkSpec extends AkkaSpec { @@ -119,6 +120,10 @@ class SinkSpec extends AkkaSpec { } } + "suitably override attribute handling methods" in { + import Attributes._ + val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("") + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 21a8bdac02..6bcd64aa69 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -3,18 +3,20 @@ */ package akka.stream.scaladsl -import scala.concurrent.Await +import akka.testkit.DefaultTimeout +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.{ Span, Millis } +import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ -import scala.util.{ Success, Failure } +import scala.util.Failure import scala.util.control.NoStackTrace -import akka.stream.{ SourceShape, ActorMaterializer } +import akka.stream._ import akka.stream.testkit._ -import akka.stream.impl.{ PublisherSource, ReactiveStreamsCompliance } -import scala.concurrent.Future -class SourceSpec extends AkkaSpec { +class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { implicit val materializer = ActorMaterializer() + implicit val config = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) "Single Source" must { "produce element" in { @@ -213,10 +215,9 @@ class SourceSpec extends AkkaSpec { "Repeat Source" must { "repeat as long as it takes" in { - import GraphDSL.Implicits._ - val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head), 1.second) - result.size should ===(10000) - result.toSet should ===(Set(42)) + val f = Source.repeat(42).grouped(1000).runWith(Sink.head) + f.futureValue.size should ===(1000) + f.futureValue.toSet should ===(Set(42)) } } @@ -224,36 +225,53 @@ class SourceSpec extends AkkaSpec { val expected = List(9227465, 5702887, 3524578, 2178309, 1346269, 832040, 514229, 317811, 196418, 121393, 75025, 46368, 28657, 17711, 10946, 6765, 4181, 2584, 1597, 987, 610, 377, 233, 144, 89, 55, 34, 21, 13, 8, 5, 3, 2, 1, 1, 0) "generate a finite fibonacci sequence" in { - val source = Source.unfold((0, 1)) { + Source.unfold((0, 1)) { case (a, _) if a > 10000000 ⇒ None case (a, b) ⇒ Some((b, a + b) → a) - } - val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }, 1.second) - result should ===(expected) + }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs } + .futureValue should ===(expected) + } + + "terminate with a failure if there is an exception thrown" in { + val t = new RuntimeException("expected") + whenReady( + Source.unfold((0, 1)) { + case (a, _) if a > 10000000 ⇒ throw t + case (a, b) ⇒ Some((b, a + b) → a) + }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }.failed) { + _ should be theSameInstanceAs (t) + } } "generate a finite fibonacci sequence asynchronously" in { - val source = Source.unfoldAsync((0, 1)) { + Source.unfoldAsync((0, 1)) { case (a, _) if a > 10000000 ⇒ Future.successful(None) - case (a, b) ⇒ Future.successful(Some((b, a + b) → a)) - } - val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }, 1.second) - result should ===(expected) + case (a, b) ⇒ Future(Some((b, a + b) → a))(system.dispatcher) + }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs } + .futureValue should ===(expected) } - "generate an infinite fibonacci sequence" in { - val source = Source.unfoldInf((0, 1)) { - case (a, b) ⇒ (b, a + b) → a - } - val result = Await.result(source.take(36).runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }, 1.second) - result should ===(expected) + "generate an unbounded fibonacci sequence" in { + Source.unfoldInf((0, 1))({ case (a, b) ⇒ (b, a + b) → a }) + .take(36) + .runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs } + .futureValue should ===(expected) } } "Iterator Source" must { "properly iterate" in { - val result = Await.result(Source.fromIterator(() ⇒ Iterator.iterate(false)(!_)).grouped(10).runWith(Sink.head), 1.second) - result should ===(Seq(false, true, false, true, false, true, false, true, false, true)) + Source.fromIterator(() ⇒ Iterator.iterate(false)(!_)) + .grouped(10) + .runWith(Sink.head) + .futureValue should ===(Seq(false, true, false, true, false, true, false, true, false, true)) + } + } + + "A Source" must { + "suitably override attribute handling methods" in { + import Attributes._ + val s: Source[Int, Unit] = Source.single(42).withAttributes(asyncBoundary).addAttributes(none).named("") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala index cb8951606f..9dbbbdd849 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala @@ -28,6 +28,12 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, Unit], override def withAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] = new SubFlowImpl[In, Out, Mat, F, C](subFlow.withAttributes(attr), mergeBackFunction, finishFunction) + override def addAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] = + new SubFlowImpl[In, Out, Mat, F, C](subFlow.addAttributes(attr), mergeBackFunction, finishFunction) + + override def named(name: String): SubFlow[Out, Mat, F, C] = + new SubFlowImpl[In, Out, Mat, F, C](subFlow.named(name), mergeBackFunction, finishFunction) + override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth) def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala index 0a8d9217e7..5a873fe54f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala @@ -10,19 +10,13 @@ import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } /** - * Unfold `GraphStage` class - * @param s initial state - * @param f unfold function - * @tparam S state - * @tparam E element + * INTERNAL API */ -private[akka] class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { - - val out: Outlet[E] = Outlet("Unfold") - +private[akka] final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { + val out: Outlet[E] = Outlet("Unfold.out") override val shape: SourceShape[E] = SourceShape(out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private[this] var state = s @@ -36,36 +30,27 @@ private[akka] class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphSta } }) } - } } /** - * UnfoldAsync `GraphStage` class - * @param s initial state - * @param f unfold function - * @tparam S state - * @tparam E element + * INTERNAL API */ -private[akka] class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { - - val out: Outlet[E] = Outlet("UnfoldAsync") - +private[akka] final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { + val out: Outlet[E] = Outlet("UnfoldAsync.out") override val shape: SourceShape[E] = SourceShape(out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private[this] var state = s - private[this] var asyncHandler: Function1[Try[Option[(S, E)]], Unit] = _ override def preStart() = { val ac = getAsyncCallback[Try[Option[(S, E)]]] { case Failure(ex) ⇒ fail(out, ex) case Success(None) ⇒ complete(out) - case Success(Some((newS, elem))) ⇒ { + case Success(Some((newS, elem))) ⇒ push(out, elem) state = newS - } } asyncHandler = ac.invoke } @@ -75,5 +60,4 @@ private[akka] class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) ext f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) }) } - } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index dd1bda470c..490932f5f0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -192,6 +192,28 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): BidiFlow[I1, O1, I2, O2, Mat2] = new BidiFlow(delegate.mapMaterializedValue(f.apply _)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = new BidiFlow(delegate.withAttributes(attr)) + + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ + override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(delegate.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 54f9477742..09cc873d2c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -116,6 +116,47 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.viaMat(flow)(combinerToScala(combine))) + /** + * Transform this [[Flow]] by appending the given processing steps, ensuring + * that an `asyncBoundary` attribute is set around those steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * `viaMat` if a different strategy is needed. + */ + def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.viaAsync(flow)) + + /** + * Transform this [[Flow]] by appending the given processing steps, ensuring + * that an `asyncBoundary` attribute is set around those steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting Flow. + */ + def viaAsyncMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + new Flow(delegate.viaAsyncMat(flow)(combinerToScala(combine))) + /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. * {{{ @@ -1431,9 +1472,28 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def initialDelay(delay: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.initialDelay(delay)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): javadsl.Flow[In, Out, Mat] = new Flow(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index bff1bc4588..a74c4376c3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -259,9 +259,28 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] = new Sink(delegate.mapMaterializedValue(f.apply _)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): javadsl.Sink[In, Mat] = new Sink(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): javadsl.Sink[In, Mat] = + new Sink(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): javadsl.Sink[In, Mat] = new Sink(delegate.named(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 7baaddb18d..e2867f67c4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -352,6 +352,47 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = new Source(delegate.viaMat(flow)(combinerToScala(combine))) + /** + * Transform this [[Source]] by appending the given processing stages, ensuring + * that an `asyncBoundary` attribute is set around those steps. + * {{{ + * +----------------------------+ + * | Resulting Source | + * | | + * | +------+ +------+ | + * | | | | | | + * | | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * `viaMat` if a different strategy is needed. + */ + def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Source[T, Mat] = + new Source(delegate.viaAsync(flow)) + + /** + * Transform this [[Source]] by appending the given processing stages, ensuring + * that an `asyncBoundary` attribute is set around those steps. + * {{{ + * +----------------------------+ + * | Resulting Source | + * | | + * | +------+ +------+ | + * | | | | | | + * | | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting Flow. + */ + def viaAsyncMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + new Source(delegate.viaAsyncMat(flow)(combinerToScala(combine))) + /** * Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both. * {{{ @@ -1599,9 +1640,28 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.initialDelay(delay)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] = new Source(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): javadsl.Source[Out, Mat] = + new Source(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): javadsl.Source[Out, Mat] = new Source(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 654c29e27a..2d1ed1f282 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -82,6 +82,29 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubFlow[In, T, Mat] = new SubFlow(delegate.via(flow)) + /** + * Transform this [[Flow]] by appending the given processing steps, ensuring + * that an `asyncBoundary` attribute is set around those steps. + * + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[Flow#viaMat viaMat]] if a different strategy is needed. + */ + def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): SubFlow[In, T, Mat] = + new SubFlow(delegate.viaAsync(flow)) + /** * Connect this [[SubFlow]] to a [[Sink]], concatenating the processing steps of both. * This means that all sub-flows that result from the previous sub-stream operator @@ -1040,9 +1063,28 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def initialDelay(delay: FiniteDuration): SubFlow[In, Out, Mat] = new SubFlow(delegate.initialDelay(delay)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ def withAttributes(attr: Attributes): SubFlow[In, Out, Mat] = new SubFlow(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + def addAttributes(attr: Attributes): SubFlow[In, Out, Mat] = + new SubFlow(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ def named(name: String): SubFlow[In, Out, Mat] = new SubFlow(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 4e23a20864..3ca68defdb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -61,7 +61,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source new Source(delegate.concatSubstreams) /** - * Transform this [[Flow]] by appending the given processing steps. + * Transform this [[SubSource]] by appending the given processing steps. * {{{ * +----------------------------+ * | Resulting Source | @@ -80,6 +80,27 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] = new SubSource(delegate.via(flow)) + /** + * Transform this [[SubSource]] by appending the given processing steps, ensuring + * that an `asyncBoundary` attribute is set around those steps. + * {{{ + * +----------------------------+ + * | Resulting Source | + * | | + * | +------+ +------+ | + * | | | | | | + * | | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[Flow#viaMat viaMat]] if a different strategy is needed. + */ + def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] = + new SubSource(delegate.viaAsync(flow)) + /** * Connect this [[SubSource]] to a [[Sink]], concatenating the processing steps of both. * This means that all sub-flows that result from the previous sub-stream operator @@ -1039,9 +1060,28 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def initialDelay(delay: FiniteDuration): SubSource[Out, Mat] = new SubSource(delegate.initialDelay(delay)) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ def withAttributes(attr: Attributes): SubSource[Out, Mat] = new SubSource(delegate.withAttributes(attr)) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + def addAttributes(attr: Attributes): SubSource[Out, Mat] = + new SubSource(delegate.addAttributes(attr)) + + /** + * Add a ``name`` attribute to this Flow. + */ def named(name: String): SubSource[Out, Mat] = new SubSource(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index fd12c42a8f..9b652997e8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -128,9 +128,28 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): BidiFlow[I1, O1, I2, O2, Mat2] = new BidiFlow(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = new BidiFlow(module.withAttributes(attr).nest()) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] = + withAttributes(module.attributes and attr) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = withAttributes(Attributes.name(name)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index ed1c966b65..7da680c159 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -202,14 +202,27 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) } /** - * Change the attributes of this [[Flow]] to the given ones. Note that this + * Change the attributes of this [[Flow]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this * operation has no effect on an empty Flow (because the attributes apply * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Repr[Out] = - if (this.module eq EmptyModule) this + if (isIdentity) this else new Flow(module.withAttributes(attr).nest()) + /** + * Add the given attributes to this Flow. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) /** @@ -369,6 +382,26 @@ trait FlowOps[+Out, +Mat] { */ def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] + /** + * Transform this [[Flow]] by appending the given processing steps, ensuring + * that an `asyncBoundary` attribute is set around those steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[Flow#viaMat viaMat]] if a different strategy is needed. + */ + def viaAsync[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = via(flow.addAttributes(Attributes.asyncBoundary)) + /** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. @@ -1585,7 +1618,9 @@ trait FlowOps[+Out, +Mat] { def withAttributes(attr: Attributes): Repr[Out] - def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) + def addAttributes(attr: Attributes): Repr[Out] + + def named(name: String): Repr[Out] /** INTERNAL API */ private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] = @@ -1620,6 +1655,26 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { */ def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[T, Mat3] + /** + * Transform this [[Flow]] by appending the given processing steps, ensuring + * that an `asyncBoundary` attribute is set around those steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting Flow. + */ + def viaAsyncMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[T, Mat3] = + viaMat(flow.addAttributes(Attributes.asyncBoundary))(combine) + /** * Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both. * {{{ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 0caa93b1b8..1a2d07037c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -904,6 +904,12 @@ object GraphDSL extends GraphApply { override def withAttributes(attr: Attributes): Repr[Out] = throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + override def addAttributes(attr: Attributes): Repr[Out] = + throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + + override def named(name: String): Repr[Out] = + throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port") + override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 68a9cbaf6f..862f32b882 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -40,9 +40,28 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = new Sink(module.transformMaterializedValue(f.asInstanceOf[Any ⇒ Any])) + /** + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ override def withAttributes(attr: Attributes): Sink[In, Mat] = new Sink(module.withAttributes(attr).nest()) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): Sink[In, Mat] = + withAttributes(module.attributes and attr) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): Sink[In, Mat] = withAttributes(Attributes.name(name)) /** Converts this Scala DSL element to it's Java DSL counterpart. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index cb648c947d..82586f5b99 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -110,13 +110,26 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f)) /** - * Nests the current Source and returns a Source with the given Attributes - * @param attr the attributes to add - * @return a new Source with the added attributes + * Change the attributes of this [[Source]] to the given ones and seal the list + * of attributes. This means that further calls will not be able to remove these + * attributes, but instead add new ones. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Repr[Out] = - new Source(module.withAttributes(attr).nest()) // User API + new Source(module.withAttributes(attr).nest()) + /** + * Add the given attributes to this Source. Further calls to `withAttributes` + * will not remove these attributes. Note that this + * operation has no effect on an empty Flow (because the attributes apply + * only to the contained processing stages). + */ + override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr) + + /** + * Add a ``name`` attribute to this Flow. + */ override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name)) /** Converts this Scala DSL element to it's Java DSL counterpart. */