Merge pull request #19268 from akka/wip-2.0-fixes-RK

Wip 2.0 fixes rk
This commit is contained in:
Konrad Malawski 2015-12-22 23:43:04 +01:00
commit 984740198e
27 changed files with 643 additions and 88 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 445 KiB

View file

@ -414,7 +414,7 @@ initialization. The buffer has demand for up to two elements without any downstr
The following code example demonstrates a buffer class corresponding to the message sequence chart above.
.. includecode:: code/docs/stream/GraphStageDocSpec.scala#detached
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#detached
Thread safety of custom processing stages
=========================================

View file

@ -49,7 +49,8 @@ can hand it back for further use to an underlying thread-pool.
.. _defining-and-running-streams-java:
Defining and running streams
----------------------------
============================
Linear processing pipelines can be expressed in Akka Streams using the following core abstractions:
Source
@ -110,7 +111,7 @@ to refer to the future:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#stream-reuse
Defining sources, sinks and flows
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
---------------------------------
The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following
examples show some of the most useful constructs (refer to the API documentation for more details):
@ -122,7 +123,8 @@ There are various ways to wire up different parts of a stream, the following exa
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-connecting
Illegal stream elements
^^^^^^^^^^^^^^^^^^^^^^^
-----------------------
In accordance to the Reactive Streams specification (`Rule 2.13 <https://github.com/reactive-streams/reactive-streams-jvm#2.13>`_)
Akka Streams do not allow ``null`` to be passed through the stream as an element. In case you want to model the concept
of absence of a value we recommend using ``akka.japi.Option`` (for Java 6 and 7) or ``java.util.Optional`` which is available since Java 8.
@ -130,7 +132,8 @@ of absence of a value we recommend using ``akka.japi.Option`` (for Java 6 and 7)
.. _back-pressure-explained-java:
Back-pressure explained
-----------------------
=======================
Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_
specification, which Akka is a founding member of.
@ -164,7 +167,8 @@ with the upstream production rate or not.
To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them:
Slow Publisher, fast Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-------------------------------
This is the happy case of course we do not need to slow down the Publisher in this case. However signalling rates are
rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now
slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled
@ -180,7 +184,8 @@ As we can see, in this scenario we effectively operate in so called push-mode si
elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements.
Fast Publisher, slow Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-------------------------------
This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with
the rate at which its upstream would like to emit data elements.
@ -198,7 +203,7 @@ this mode of operation is referred to as pull-based back-pressure.
.. _stream-materialization-java:
Stream Materialization
----------------------
======================
When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan.
Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources
@ -220,8 +225,62 @@ which will be running on the thread pools they have been configured to run on -
.. _flow-combine-mat-java:
Operator Fusion
---------------
Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that
the processing steps of a flow or stream graph can be executed within the same Actor and has three
consequences:
* starting up a stream may take longer than before due to executing the fusion algorithm
* passing elements from one processing stage to the next is a lot faster between fused
stages due to avoiding the asynchronous messaging overhead
* fused stream processing stages do no longer run in parallel to each other, meaning that
only up to one CPU core is used for each fused part
The first point can be countered by pre-fusing and then reusing a stream blueprint as sketched below:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#explicit-fusing
In order to balance the effects of the second and third bullet points you will have to insert asynchronous
boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that
shall communicate with the rest of the graph in an asynchronous fashion.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-async
In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding
and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can
work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a
flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work
by adding information to the flow graph that has been constructed up to this point:
|
.. image:: ../images/asyncBoundary.png
:align: center
:width: 700
|
This means that everything that is inside the red bubble will be executed by one actor and everything outside of it
by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all
processing stages that have been added since them.
.. warning::
Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer
that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers
may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer
there, data elements are passed without buffering between fused stages. In those cases where buffering
is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the
``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function.
The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``.
In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the
:class:`SslTlsStage` and the ``groupBy`` operator all built-in processing stages can be fused.
Combining materialized values
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-----------------------------
Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary
to somehow express how these values should be composed to a final value when we plug these stages together. For this,

View file

@ -220,4 +220,29 @@ class FlowDocSpec extends AkkaSpec {
//#flow-mat-combine
}
"explicit fusing" in {
//#explicit-fusing
import akka.stream.Fusing
val flow = Flow[Int].map(_ * 2).filter(_ > 500)
val fused = Fusing.aggressive(flow)
Source.fromIterator { () => Iterator from 0 }
.via(fused)
.take(1000)
//#explicit-fusing
}
"defining asynchronous boundaries" in {
//#flow-async
import akka.stream.Attributes.asyncBoundary
Source(List(1, 2, 3))
.map(_ + 1)
.withAttributes(asyncBoundary)
.map(_ * 2)
.to(Sink.ignore)
//#flow-async
}
}

View file

@ -49,7 +49,8 @@ can hand it back for further use to an underlying thread-pool.
.. _defining-and-running-streams-scala:
Defining and running streams
----------------------------
============================
Linear processing pipelines can be expressed in Akka Streams using the following core abstractions:
Source
@ -114,7 +115,7 @@ to refer to the future:
.. includecode:: code/docs/stream/FlowDocSpec.scala#stream-reuse
Defining sources, sinks and flows
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
---------------------------------
The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following
examples show some of the most useful constructs (refer to the API documentation for more details):
@ -126,7 +127,8 @@ There are various ways to wire up different parts of a stream, the following exa
.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-connecting
Illegal stream elements
^^^^^^^^^^^^^^^^^^^^^^^
-----------------------
In accordance to the Reactive Streams specification (`Rule 2.13 <https://github.com/reactive-streams/reactive-streams-jvm#2.13>`_)
Akka Streams do not allow ``null`` to be passed through the stream as an element. In case you want to model the concept
of absence of a value we recommend using ``scala.Option`` or ``scala.util.Either``.
@ -134,7 +136,8 @@ of absence of a value we recommend using ``scala.Option`` or ``scala.util.Either
.. _back-pressure-explained-scala:
Back-pressure explained
-----------------------
=======================
Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_
specification, which Akka is a founding member of.
@ -168,7 +171,8 @@ with the upstream production rate or not.
To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them:
Slow Publisher, fast Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-------------------------------
This is the happy case of course we do not need to slow down the Publisher in this case. However signalling rates are
rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now
slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled
@ -184,7 +188,8 @@ As we can see, in this scenario we effectively operate in so called push-mode si
elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements.
Fast Publisher, slow Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-------------------------------
This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with
the rate at which its upstream would like to emit data elements.
@ -202,7 +207,7 @@ this mode of operation is referred to as pull-based back-pressure.
.. _stream-materialization-scala:
Stream Materialization
----------------------
======================
When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan.
Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources
@ -222,10 +227,64 @@ which will be running on the thread pools they have been configured to run on -
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside composite Graphs is legal,
yet will materialize that stage multiple times.
Operator Fusion
---------------
Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that
the processing steps of a flow or stream graph can be executed within the same Actor and has three
consequences:
* starting up a stream may take longer than before due to executing the fusion algorithm
* passing elements from one processing stage to the next is a lot faster between fused
stages due to avoiding the asynchronous messaging overhead
* fused stream processing stages do no longer run in parallel to each other, meaning that
only up to one CPU core is used for each fused part
The first point can be countered by pre-fusing and then reusing a stream blueprint as sketched below:
.. includecode:: code/docs/stream/FlowDocSpec.scala#explicit-fusing
In order to balance the effects of the second and third bullet points you will have to insert asynchronous
boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that
shall communicate with the rest of the graph in an asynchronous fashion.
.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-async
In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding
and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can
work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a
flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work
by adding information to the flow graph that has been constructed up to this point:
|
.. image:: ../images/asyncBoundary.png
:align: center
:width: 700
|
This means that everything that is inside the red bubble will be executed by one actor and everything outside of it
by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all
processing stages that have been added since them.
.. warning::
Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer
that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers
may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer
there, data elements are passed without buffering between fused stages. In those cases where buffering
is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the
``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function.
The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``.
In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the
:class:`SslTlsStage` and the ``groupBy`` operator all built-in processing stages can be fused.
.. _flow-combine-mat-scala:
Combining materialized values
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-----------------------------
Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary
to somehow express how these values should be composed to a final value when we plug these stages together. For this,

View file

@ -272,4 +272,10 @@ public class BidiFlowTest extends StreamTest {
Arrays.sort(rr);
assertArrayEquals(new Long[] { 3L, 12L }, rr);
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final BidiFlow<Integer, Long, ByteString, String, BoxedUnit> b =
bidi.withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
}
}

View file

@ -785,4 +785,9 @@ public class FlowTest extends StreamTest {
assertEquals((Object) 0, result);
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final Flow<Integer, Integer, BoxedUnit> f =
Flow.of(Integer.class).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
}
}

View file

@ -13,16 +13,13 @@ import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.japi.function.Function;
import akka.japi.function.Procedure;
import akka.stream.Graph;
import akka.stream.UniformFanInShape;
import akka.stream.UniformFanOutShape;
import akka.stream.*;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.stream.StreamTest;
import akka.japi.function.Function2;
import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
@ -101,4 +98,9 @@ public class SinkTest extends StreamTest {
probe2.expectMsgEquals("done2");
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final Sink<Integer, Future<Integer>> s =
Sink.<Integer> head().withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
}
}

View file

@ -11,10 +11,7 @@ import akka.dispatch.OnSuccess;
import akka.japi.JavaPartialFunction;
import akka.japi.Pair;
import akka.japi.function.*;
import akka.stream.Graph;
import akka.stream.OverflowStrategy;
import akka.stream.StreamTest;
import akka.stream.UniformFanInShape;
import akka.stream.*;
import akka.stream.impl.ConstantFun;
import akka.stream.stage.*;
import akka.stream.testkit.AkkaSpec;
@ -776,4 +773,9 @@ public class SourceTest extends StreamTest {
assertEquals((Object) 0, result);
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final Source<Integer, BoxedUnit> f =
Source.single(42).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
}
}

View file

@ -105,7 +105,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple
.sorted should ===(0 to 198 by 2)
}
"use multiple actors when there are asynchronous boundaries in the subflows" in {
"use multiple actors when there are asynchronous boundaries in the subflows (manual)" in {
def ref = {
val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging]
bus.logSource
@ -120,7 +120,26 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple
.sorted should ===(0 to 9)
val refs = receiveN(20)
withClue(s"refs=\n${refs.mkString("\n")}") {
refs.toSet.size should ===(11)
refs.toSet.size should ===(11) // main flow + 10 subflows
}
}
"use multiple actors when there are asynchronous boundaries in the subflows (combinator)" in {
def ref = {
val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging]
bus.logSource
}
val flow = Flow[Int].map(x { testActor ! ref; x })
Source(0 to 9)
.map(x { testActor ! ref; x })
.flatMapMerge(5, i Source.single(i).viaAsync(flow))
.grouped(1000)
.runWith(Sink.head)
.futureValue
.sorted should ===(0 to 9)
val refs = receiveN(20)
withClue(s"refs=\n${refs.mkString("\n")}") {
refs.toSet.size should ===(11) // main flow + 10 subflows
}
}

View file

@ -111,6 +111,11 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
Await.result(r, 1.second).toSet should ===(Set(3L, 12L))
}
"suitably override attribute handling methods" in {
import Attributes._
val b: BidiFlow[Int, Long, ByteString, String, Unit] = bidi.withAttributes(name("")).addAttributes(asyncBoundary).named("")
}
}
}

View file

@ -586,6 +586,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
}
}
"suitably override attribute handling methods" in {
import Attributes._
val f: Flow[Int, Int, Unit] = Flow[Int].withAttributes(asyncBoundary).addAttributes(none).named("")
}
}
object TestException extends RuntimeException with NoStackTrace

View file

@ -3,9 +3,10 @@
*/
package akka.stream.scaladsl
import akka.stream.{ SinkShape, ActorMaterializer }
import akka.stream._
import akka.stream.testkit.TestPublisher.ManualProbe
import akka.stream.testkit._
import scala.concurrent.Future
class SinkSpec extends AkkaSpec {
@ -119,6 +120,10 @@ class SinkSpec extends AkkaSpec {
}
}
"suitably override attribute handling methods" in {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("")
}
}
}

View file

@ -3,18 +3,20 @@
*/
package akka.stream.scaladsl
import scala.concurrent.Await
import akka.testkit.DefaultTimeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Span, Millis }
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import scala.util.{ Success, Failure }
import scala.util.Failure
import scala.util.control.NoStackTrace
import akka.stream.{ SourceShape, ActorMaterializer }
import akka.stream._
import akka.stream.testkit._
import akka.stream.impl.{ PublisherSource, ReactiveStreamsCompliance }
import scala.concurrent.Future
class SourceSpec extends AkkaSpec {
class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures {
implicit val materializer = ActorMaterializer()
implicit val config = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
"Single Source" must {
"produce element" in {
@ -213,10 +215,9 @@ class SourceSpec extends AkkaSpec {
"Repeat Source" must {
"repeat as long as it takes" in {
import GraphDSL.Implicits._
val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head), 1.second)
result.size should ===(10000)
result.toSet should ===(Set(42))
val f = Source.repeat(42).grouped(1000).runWith(Sink.head)
f.futureValue.size should ===(1000)
f.futureValue.toSet should ===(Set(42))
}
}
@ -224,36 +225,53 @@ class SourceSpec extends AkkaSpec {
val expected = List(9227465, 5702887, 3524578, 2178309, 1346269, 832040, 514229, 317811, 196418, 121393, 75025, 46368, 28657, 17711, 10946, 6765, 4181, 2584, 1597, 987, 610, 377, 233, 144, 89, 55, 34, 21, 13, 8, 5, 3, 2, 1, 1, 0)
"generate a finite fibonacci sequence" in {
val source = Source.unfold((0, 1)) {
Source.unfold((0, 1)) {
case (a, _) if a > 10000000 None
case (a, b) Some((b, a + b) a)
}.runFold(List.empty[Int]) { case (xs, x) x :: xs }
.futureValue should ===(expected)
}
"terminate with a failure if there is an exception thrown" in {
val t = new RuntimeException("expected")
whenReady(
Source.unfold((0, 1)) {
case (a, _) if a > 10000000 throw t
case (a, b) Some((b, a + b) a)
}.runFold(List.empty[Int]) { case (xs, x) x :: xs }.failed) {
_ should be theSameInstanceAs (t)
}
val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) x :: xs }, 1.second)
result should ===(expected)
}
"generate a finite fibonacci sequence asynchronously" in {
val source = Source.unfoldAsync((0, 1)) {
Source.unfoldAsync((0, 1)) {
case (a, _) if a > 10000000 Future.successful(None)
case (a, b) Future.successful(Some((b, a + b) a))
}
val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) x :: xs }, 1.second)
result should ===(expected)
case (a, b) Future(Some((b, a + b) a))(system.dispatcher)
}.runFold(List.empty[Int]) { case (xs, x) x :: xs }
.futureValue should ===(expected)
}
"generate an infinite fibonacci sequence" in {
val source = Source.unfoldInf((0, 1)) {
case (a, b) (b, a + b) a
}
val result = Await.result(source.take(36).runFold(List.empty[Int]) { case (xs, x) x :: xs }, 1.second)
result should ===(expected)
"generate an unbounded fibonacci sequence" in {
Source.unfoldInf((0, 1))({ case (a, b) (b, a + b) a })
.take(36)
.runFold(List.empty[Int]) { case (xs, x) x :: xs }
.futureValue should ===(expected)
}
}
"Iterator Source" must {
"properly iterate" in {
val result = Await.result(Source.fromIterator(() Iterator.iterate(false)(!_)).grouped(10).runWith(Sink.head), 1.second)
result should ===(Seq(false, true, false, true, false, true, false, true, false, true))
Source.fromIterator(() Iterator.iterate(false)(!_))
.grouped(10)
.runWith(Sink.head)
.futureValue should ===(Seq(false, true, false, true, false, true, false, true, false, true))
}
}
"A Source" must {
"suitably override attribute handling methods" in {
import Attributes._
val s: Source[Int, Unit] = Source.single(42).withAttributes(asyncBoundary).addAttributes(none).named("")
}
}

View file

@ -28,6 +28,12 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, Unit],
override def withAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] =
new SubFlowImpl[In, Out, Mat, F, C](subFlow.withAttributes(attr), mergeBackFunction, finishFunction)
override def addAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] =
new SubFlowImpl[In, Out, Mat, F, C](subFlow.addAttributes(attr), mergeBackFunction, finishFunction)
override def named(name: String): SubFlow[Out, Mat, F, C] =
new SubFlowImpl[In, Out, Mat, F, C](subFlow.named(name), mergeBackFunction, finishFunction)
override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth)
def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink))

View file

@ -10,19 +10,13 @@ import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
/**
* Unfold `GraphStage` class
* @param s initial state
* @param f unfold function
* @tparam S state
* @tparam E element
* INTERNAL API
*/
private[akka] class Unfold[S, E](s: S, f: S Option[(S, E)]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("Unfold")
private[akka] final class Unfold[S, E](s: S, f: S Option[(S, E)]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("Unfold.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private[this] var state = s
@ -36,37 +30,28 @@ private[akka] class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphSta
}
})
}
}
}
/**
* UnfoldAsync `GraphStage` class
* @param s initial state
* @param f unfold function
* @tparam S state
* @tparam E element
* INTERNAL API
*/
private[akka] class UnfoldAsync[S, E](s: S, f: S Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldAsync")
private[akka] final class UnfoldAsync[S, E](s: S, f: S Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldAsync.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private[this] var state = s
private[this] var asyncHandler: Function1[Try[Option[(S, E)]], Unit] = _
override def preStart() = {
val ac = getAsyncCallback[Try[Option[(S, E)]]] {
case Failure(ex) fail(out, ex)
case Success(None) complete(out)
case Success(Some((newS, elem))) {
case Success(Some((newS, elem)))
push(out, elem)
state = newS
}
}
asyncHandler = ac.invoke
}
@ -75,5 +60,4 @@ private[akka] class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) ext
f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
})
}
}
}

View file

@ -192,6 +192,28 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): BidiFlow[I1, O1, I2, O2, Mat2] =
new BidiFlow(delegate.mapMaterializedValue(f.apply _))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.named(name))
}

View file

@ -116,6 +116,47 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.viaMat(flow)(combinerToScala(combine)))
/**
* Transform this [[Flow]] by appending the given processing steps, ensuring
* that an `asyncBoundary` attribute is set around those steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* `viaMat` if a different strategy is needed.
*/
def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.viaAsync(flow))
/**
* Transform this [[Flow]] by appending the given processing steps, ensuring
* that an `asyncBoundary` attribute is set around those steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*/
def viaAsyncMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.viaAsyncMat(flow)(combinerToScala(combine)))
/**
* Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
* {{{
@ -1431,9 +1472,28 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def initialDelay(delay: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.named(name))

View file

@ -259,9 +259,28 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] =
new Sink(delegate.mapMaterializedValue(f.apply _))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): javadsl.Sink[In, Mat] =
new Sink(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): javadsl.Sink[In, Mat] =
new Sink(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): javadsl.Sink[In, Mat] =
new Sink(delegate.named(name))
}

View file

@ -352,6 +352,47 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.viaMat(flow)(combinerToScala(combine)))
/**
* Transform this [[Source]] by appending the given processing stages, ensuring
* that an `asyncBoundary` attribute is set around those steps.
* {{{
* +----------------------------+
* | Resulting Source |
* | |
* | +------+ +------+ |
* | | | | | |
* | | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* `viaMat` if a different strategy is needed.
*/
def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Source[T, Mat] =
new Source(delegate.viaAsync(flow))
/**
* Transform this [[Source]] by appending the given processing stages, ensuring
* that an `asyncBoundary` attribute is set around those steps.
* {{{
* +----------------------------+
* | Resulting Source |
* | |
* | +------+ +------+ |
* | | | | | |
* | | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*/
def viaAsyncMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] =
new Source(delegate.viaAsyncMat(flow)(combinerToScala(combine)))
/**
* Connect this [[Source]] to a [[Sink]], concatenating the processing steps of both.
* {{{
@ -1599,9 +1640,28 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): javadsl.Source[Out, Mat] =
new Source(delegate.named(name))

View file

@ -82,6 +82,29 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubFlow[In, T, Mat] =
new SubFlow(delegate.via(flow))
/**
* Transform this [[Flow]] by appending the given processing steps, ensuring
* that an `asyncBoundary` attribute is set around those steps.
*
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
*
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*/
def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): SubFlow[In, T, Mat] =
new SubFlow(delegate.viaAsync(flow))
/**
* Connect this [[SubFlow]] to a [[Sink]], concatenating the processing steps of both.
* This means that all sub-flows that result from the previous sub-stream operator
@ -1040,9 +1063,28 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def initialDelay(delay: FiniteDuration): SubFlow[In, Out, Mat] =
new SubFlow(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
def withAttributes(attr: Attributes): SubFlow[In, Out, Mat] =
new SubFlow(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
def addAttributes(attr: Attributes): SubFlow[In, Out, Mat] =
new SubFlow(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
def named(name: String): SubFlow[In, Out, Mat] =
new SubFlow(delegate.named(name))

View file

@ -61,7 +61,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
new Source(delegate.concatSubstreams)
/**
* Transform this [[Flow]] by appending the given processing steps.
* Transform this [[SubSource]] by appending the given processing steps.
* {{{
* +----------------------------+
* | Resulting Source |
@ -80,6 +80,27 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def via[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] =
new SubSource(delegate.via(flow))
/**
* Transform this [[SubSource]] by appending the given processing steps, ensuring
* that an `asyncBoundary` attribute is set around those steps.
* {{{
* +----------------------------+
* | Resulting Source |
* | |
* | +------+ +------+ |
* | | | | | |
* | | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*/
def viaAsync[T, M](flow: Graph[FlowShape[Out, T], M]): SubSource[T, Mat] =
new SubSource(delegate.viaAsync(flow))
/**
* Connect this [[SubSource]] to a [[Sink]], concatenating the processing steps of both.
* This means that all sub-flows that result from the previous sub-stream operator
@ -1039,9 +1060,28 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def initialDelay(delay: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
def withAttributes(attr: Attributes): SubSource[Out, Mat] =
new SubSource(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
def addAttributes(attr: Attributes): SubSource[Out, Mat] =
new SubSource(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
def named(name: String): SubSource[Out, Mat] =
new SubSource(delegate.named(name))

View file

@ -128,9 +128,28 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu
def mapMaterializedValue[Mat2](f: Mat Mat2): BidiFlow[I1, O1, I2, O2, Mat2] =
new BidiFlow(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(module.withAttributes(attr).nest())
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
withAttributes(module.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] =
withAttributes(Attributes.name(name))
}

View file

@ -202,14 +202,27 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
}
/**
* Change the attributes of this [[Flow]] to the given ones. Note that this
* Change the attributes of this [[Flow]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Repr[Out] =
if (this.module eq EmptyModule) this
if (isIdentity) this
else new Flow(module.withAttributes(attr).nest())
/**
* Add the given attributes to this Flow. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name))
/**
@ -369,6 +382,26 @@ trait FlowOps[+Out, +Mat] {
*/
def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T]
/**
* Transform this [[Flow]] by appending the given processing steps, ensuring
* that an `asyncBoundary` attribute is set around those steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*/
def viaAsync[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = via(flow.addAttributes(Attributes.asyncBoundary))
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
@ -1585,7 +1618,9 @@ trait FlowOps[+Out, +Mat] {
def withAttributes(attr: Attributes): Repr[Out]
def named(name: String): Repr[Out] = withAttributes(Attributes.name(name))
def addAttributes(attr: Attributes): Repr[Out]
def named(name: String): Repr[Out]
/** INTERNAL API */
private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] =
@ -1620,6 +1655,26 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*/
def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): ReprMat[T, Mat3]
/**
* Transform this [[Flow]] by appending the given processing steps, ensuring
* that an `asyncBoundary` attribute is set around those steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*/
def viaAsyncMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): ReprMat[T, Mat3] =
viaMat(flow.addAttributes(Attributes.asyncBoundary))(combine)
/**
* Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
* {{{

View file

@ -904,6 +904,12 @@ object GraphDSL extends GraphApply {
override def withAttributes(attr: Attributes): Repr[Out] =
throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")
override def addAttributes(attr: Attributes): Repr[Out] =
throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")
override def named(name: String): Repr[Out] =
throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")
override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =

View file

@ -40,9 +40,28 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
def mapMaterializedValue[Mat2](f: Mat Mat2): Sink[In, Mat2] =
new Sink(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Sink[In, Mat] =
new Sink(module.withAttributes(attr).nest())
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): Sink[In, Mat] =
withAttributes(module.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): Sink[In, Mat] = withAttributes(Attributes.name(name))
/** Converts this Scala DSL element to it's Java DSL counterpart. */

View file

@ -110,13 +110,26 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
def runForeach(f: Out Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f))
/**
* Nests the current Source and returns a Source with the given Attributes
* @param attr the attributes to add
* @return a new Source with the added attributes
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Repr[Out] =
new Source(module.withAttributes(attr).nest()) // User API
new Source(module.withAttributes(attr).nest())
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name))
/** Converts this Scala DSL element to it's Java DSL counterpart. */