diff --git a/akka-docs-dev/rst/java/stream-customize.rst b/akka-docs-dev/rst/java/stream-customize.rst index b36f8718a6..536f7b1246 100644 --- a/akka-docs-dev/rst/java/stream-customize.rst +++ b/akka-docs-dev/rst/java/stream-customize.rst @@ -322,3 +322,19 @@ and continue with shutting down the entire stream. It is not possible to emit elements from the completion handling, since completion handlers may be invoked at any time (without regard to downstream demand being available). +Thread safety of custom processing stages +========================================= + +All of the above custom stages (linear or graph) provide a few simple guarantees that implementors can rely on. + - The callbacks exposed by all of these classes are never called concurrently. + - The state encapsulated by these classes can be safely modified from the provided callbacks, without any further + synchronization. + +In essence, the above guarantees are similar to what :class:`Actor`s provide, if one thinks of the state of a custom +stage as state of an actor, and the callbacks as the ``receive`` block of the actor. + +.. warning:: +It is **not safe** to access the state of any custom stage outside of the callbacks that it provides, just like it + is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over** + internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined + behavior. \ No newline at end of file diff --git a/akka-docs-dev/rst/java/stream-flows-and-basics.rst b/akka-docs-dev/rst/java/stream-flows-and-basics.rst index 969f0daacc..c41bf266e1 100644 --- a/akka-docs-dev/rst/java/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/java/stream-flows-and-basics.rst @@ -26,12 +26,22 @@ Back-pressure A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*. +Non-Blocking + Means that a certain operation does not hinder the progress of the calling thread, even if it takes long time to + finish the requested operation. Processing Stage The common name for all building blocks that build up a Flow or FlowGraph. Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like :class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``. For the full list of built-in processing stages see :ref:`stages-overview` +When we talk about *asynchronous, non-blocking backpressure* we mean that the processing stages available in Akka +Streams will not use blocking calls but asynchronous message passing to exchange messages between each other, and they +will use asynchronous means to slow down a fast producer, without blocking its thread. This is a thread-pool friendly +design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but +can hand it back for further use to an underlying thread-pool. + + Defining and running streams ---------------------------- Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst index 19473ac18e..c99ec06a2f 100644 --- a/akka-docs-dev/rst/java/stream-graphs.rst +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -217,15 +217,14 @@ The following example demonstrates a case where the materialized ``Future`` of a Graph cycles, liveness and deadlocks ------------------------------------ -By default :class:`FlowGraph` does not allow (or to be precise, its builder does not allow) the creation of cycles. -The reason for this is that cycles need special considerations to avoid potential deadlocks and other liveness issues. +Cycles in bounded flow graphs need special considerations to avoid potential deadlocks and other liveness issues. This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing graphs. -The first example demonstrates a graph that contains a naive cycle (the presence of cycles is enabled by calling -``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements -to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via -a ``Merge`` junction. +The first example demonstrates a graph that contains a naive cycle. +The graph takes elements from the source, prints them, then broadcasts those elements +to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main +via a ``Merge`` junction. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphCyclesDocTest.java#deadlocked diff --git a/akka-docs-dev/rst/java/stream-integrations.rst b/akka-docs-dev/rst/java/stream-integrations.rst index 8389fdc6b4..afc1a3e5c7 100644 --- a/akka-docs-dev/rst/java/stream-integrations.rst +++ b/akka-docs-dev/rst/java/stream-integrations.rst @@ -37,7 +37,9 @@ stream if there is demand from downstream, otherwise they will be buffered until demand is received. Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space -available in the buffer. +available in the buffer. The strategy ``OverflowStrategy.backpressure()`` is not supported +for this Source type, you should consider using ``ActorPublisher`` if you want a backpressured +actor interface. The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or ``akka.actor.Status.Success`` to the actor reference. diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index a014734a38..bfab4e952a 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -324,3 +324,19 @@ and continue with shutting down the entire stream. It is not possible to emit elements from the completion handling, since completion handlers may be invoked at any time (without regard to downstream demand being available). +Thread safety of custom processing stages +========================================= + +All of the above custom stages (linear or graph) provide a few simple guarantees that implementors can rely on. + - The callbacks exposed by all of these classes are never called concurrently. + - The state encapsulated by these classes can be safely modified from the provided callbacks, without any further + synchronization. + +In essence, the above guarantees are similar to what :class:`Actor`s provide, if one thinks of the state of a custom +stage as state of an actor, and the callbacks as the ``receive`` block of the actor. + +.. warning:: + It is **not safe** to access the state of any custom stage outside of the callbacks that it provides, just like it + is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over** + internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined + behavior. \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index 12febcff4a..9ab8de8daa 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -26,12 +26,21 @@ Back-pressure A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*. +Non-Blocking + Means that a certain operation does not hinder the progress of the calling thread, even if it takes long time to + finish the requested operation. Processing Stage The common name for all building blocks that build up a Flow or FlowGraph. Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like :class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``. For the full list of built-in processing stages see :ref:`stages-overview` +When we talk about *asynchronous, non-blocking backpressure* we mean that the processing stages available in Akka +Streams will not use blocking calls but asynchronous message passing to exchange messages between each other, and they +will use asynchronous means to slow down a fast producer, without blocking its thread. This is a thread-pool friendly +design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but +can hand it back for further use to an underlying thread-pool. + Defining and running streams ---------------------------- Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index d3aa5d8e9e..68182c0aa3 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -275,8 +275,8 @@ Cycles in bounded flow graphs need special considerations to avoid potential dea This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing graphs. -The first example demonstrates a graph that contains a naïve cycle (the presence of cycles is enabled by calling -``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements +The first example demonstrates a graph that contains a naïve cycle. +The graph takes elements from the source, prints them, then broadcasts those elements to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via a ``Merge`` junction. diff --git a/akka-docs-dev/rst/scala/stream-integrations.rst b/akka-docs-dev/rst/scala/stream-integrations.rst index 01e97a5d8a..87f99e07f0 100644 --- a/akka-docs-dev/rst/scala/stream-integrations.rst +++ b/akka-docs-dev/rst/scala/stream-integrations.rst @@ -32,7 +32,9 @@ stream if there is demand from downstream, otherwise they will be buffered until demand is received. Depending on the defined :class:`OverflowStrategy` it might drop elements if there is no space -available in the buffer. +available in the buffer. The strategy ``OverflowStrategy.backpressure`` is not supported +for this Source type, you should consider using ``ActorPublisher`` if you want a backpressured +actor interface. The stream can be completed successfully by sending ``akka.actor.PoisonPill`` or ``akka.actor.Status.Success`` to the actor reference. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c8ba1e6a33..91467cee3c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -180,6 +180,9 @@ object Source { * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if * there is no space available in the buffer. * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does * not matter. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index bd3cf2f3b6..794a48e168 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -100,6 +100,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. * + * Please note that the startup of the server is asynchronous, i.e. after materializing the enclosing + * [[akka.stream.scaladsl.RunnableGraph]] the server is not immediately available. Only after the materialized future + * completes is the server ready to accept client connections. + * * @param interface The interface to listen on * @param port The port to listen on * @param backlog Controls the size of the connection backlog @@ -127,6 +131,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[Tcp.ServerBinding]] without specifying options. * It represents a prospective TCP server binding on the given `endpoint`. + * + * Please note that the startup of the server is asynchronous, i.e. after materializing the enclosing + * [[akka.stream.scaladsl.RunnableGraph]] the server is not immediately available. Only after the materialized future + * completes is the server ready to accept client connections. */ def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]] = Source.adapt(delegate.bind(interface, port) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index ee1ffbfd16..5782e738c0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -324,6 +324,9 @@ object Source extends SourceApply { * Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if * there is no space available in the buffer. * + * The strategy [[akka.stream.OverflowStrategy.backpressure]] is not supported, and an + * IllegalArgument("Backpressure overflowStrategy not supported") will be thrown if it is passed as argument. + * * The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does * not matter. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index fdc30ea3c4..13c85c9dbb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -119,6 +119,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. * + * Please note that the startup of the server is asynchronous, i.e. after materializing the enclosing + * [[akka.stream.scaladsl.RunnableGraph]] the server is not immediately available. Only after the materialized future + * completes is the server ready to accept client connections. + * * @param interface The interface to listen on * @param port The port to listen on * @param backlog Controls the size of the connection backlog @@ -147,6 +151,10 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` * handling the incoming connections using the provided Flow. * + * Please note that the startup of the server is asynchronous, i.e. after materializing the enclosing + * [[akka.stream.scaladsl.RunnableGraph]] the server is not immediately available. Only after the returned future + * completes is the server ready to accept client connections. + * * @param handler A Flow that represents the server logic * @param interface The interface to listen on * @param port The port to listen on