From 063e289718ed5f1aa4e6c42d0c1f5404d8ec2de7 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 19 Jan 2016 20:26:43 +0100 Subject: [PATCH] add migration guides for Scala and Java also fix missing includes and wrong file locations --- akka-contrib/docs/index.rst | 1 + .../java/code/docs/stream/MigrationsJava.java | 256 +----- akka-docs/rst/java/stream/index.rst | 1 + .../stream/migration-guide-1.0-2.x-java.rst | 720 +---------------- .../stream/migration-guide-2.0-2.4-java.rst | 75 ++ .../migration-guide-streams-2.0.x-2.4.x.rst | 52 -- akka-docs/rst/project/migration-guides.rst | 1 - .../code/docs/stream/MigrationsScala.scala | 280 +------ akka-docs/rst/scala/stream/index.rst | 1 + .../stream/migration-guide-1.0-2.x-scala.rst | 744 +----------------- .../stream/migration-guide-2.0-2.4-scala.rst | 73 ++ .../main/scala/akka/stream/javadsl/Flow.scala | 2 +- 12 files changed, 179 insertions(+), 2027 deletions(-) create mode 100644 akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst delete mode 100644 akka-docs/rst/project/migration-guide-streams-2.0.x-2.4.x.rst create mode 100644 akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst diff --git a/akka-contrib/docs/index.rst b/akka-contrib/docs/index.rst index 691f96b2a7..0f3c1f2263 100644 --- a/akka-contrib/docs/index.rst +++ b/akka-contrib/docs/index.rst @@ -36,6 +36,7 @@ The Current List of Modules peek-mailbox aggregator receive-pipeline + circuitbreaker Suggested Way of Using these Contributions ------------------------------------------ diff --git a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java index f9a666370e..84bf97bf28 100644 --- a/akka-docs/rst/java/code/docs/stream/MigrationsJava.java +++ b/akka-docs/rst/java/code/docs/stream/MigrationsJava.java @@ -3,258 +3,22 @@ */ package docs.stream; -import akka.NotUsed; -import akka.actor.ActorSystem; -import akka.actor.Cancellable; -import akka.http.javadsl.model.Uri; -import akka.dispatch.Futures; -import akka.japi.function.Creator; +import java.util.stream.Stream; + import akka.japi.Pair; -import akka.japi.function.Function; -import akka.stream.*; import akka.stream.javadsl.*; -import akka.stream.testkit.TestPublisher; -import akka.stream.testkit.TestSubscriber; -import akka.util.ByteString; -import scala.Option; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; -import scala.concurrent.Promise; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.nio.charset.Charset; public class MigrationsJava { - // This is compile-only code, no need for actually running anything. - public static ActorMaterializer mat = null; - public static ActorSystem sys = null; - - public static class SomeInputStream extends InputStream { - public SomeInputStream() { - } - - @Override - public int read() throws IOException { - return 0; - } - } - - public static class SomeOutputStream extends OutputStream { - @Override - public void write(int b) throws IOException { - return; - } - } - public static void main(String[] args) { - - Outlet outlet = null; - - Outlet outlet1 = null; - Outlet outlet2 = null; - - Inlet inlet = null; - - Inlet inlet1 = null; - Inlet inlet2 = null; - - Flow flow = Flow.of(Integer.class); - Flow flow1 = Flow.of(Integer.class); - Flow flow2 = Flow.of(Integer.class); - - Promise> promise = null; - - - { - Graph, NotUsed> graphSource = null; - Graph, NotUsed> graphSink = null; - Graph, NotUsed> graphFlow = null; - - //#flow-wrap - Source source = Source.fromGraph(graphSource); - Sink sink = Sink.fromGraph(graphSink); - Flow aflow = Flow.fromGraph(graphFlow); - Flow.fromSinkAndSource(Sink.head(), Source.single(0)); - Flow.fromSinkAndSourceMat(Sink.head(), Source.single(0), Keep.left()); - //#flow-wrap - - Graph, NotUsed> bidiGraph = null; - - //#bidi-wrap - BidiFlow bidiFlow = - BidiFlow.fromGraph(bidiGraph); - BidiFlow.fromFlows(flow1, flow2); - BidiFlow.fromFlowsMat(flow1, flow2, Keep.both()); - //#bidi-wrap - - } - - { - //#graph-create - GraphDSL.create(builder -> { - //... - return ClosedShape.getInstance(); - }); - - GraphDSL.create(builder -> { - //... - return new FlowShape<>(inlet, outlet); - }); - //#graph-create - } - - { - //#graph-create-2 - GraphDSL.create(builder -> { - //... - return SourceShape.of(outlet); - }); - - GraphDSL.create(builder -> { - //... - return SinkShape.of(inlet); - }); - - GraphDSL.create(builder -> { - //... - return FlowShape.of(inlet, outlet); - }); - - GraphDSL.create(builder -> { - //... - return BidiShape.of(inlet1, outlet1, inlet2, outlet2); - }); - //#graph-create-2 - } - - { - //#graph-builder - GraphDSL.create(builder -> { - builder.from(outlet).toInlet(inlet); - builder.from(outlet).via(builder.add(flow)).toInlet(inlet); - builder.from(builder.add(Source.single(0))).to(builder.add(Sink.head())); - //... - return ClosedShape.getInstance(); - }); - //#graph-builder - } - - //#source-creators - Source>> src = Source.maybe(); - // Complete the promise with an empty option to emulate the old lazyEmpty - promise.trySuccess(scala.Option.empty()); - - final Source ticks = Source.tick( - FiniteDuration.create(0, TimeUnit.MILLISECONDS), - FiniteDuration.create(200, TimeUnit.MILLISECONDS), - "tick"); - - final Source pubSource = - Source.fromPublisher(TestPublisher.manualProbe(true, sys)); - - final Source futSource = - Source.fromFuture(Futures.successful(42)); - - final Source> subSource = - Source.asSubscriber(); - //#source-creators - - //#sink-creators - final Sink subSink = - Sink.fromSubscriber(TestSubscriber.manualProbe(sys)); - //#sink-creators - - //#sink-as-publisher - final Sink> pubSink = - Sink.asPublisher(false); - - final Sink> pubSinkFanout = - Sink.asPublisher(true); - //#sink-as-publisher - - //#empty-flow - Flow emptyFlow = Flow.create(); - // or - Flow emptyFlow2 = Flow.of(Integer.class); - //#empty-flow - - //#flatMapConcat - Flow.>create(). - flatMapConcat(i -> i); - //#flatMapConcat - - //#group-flatten - Flow.of(Integer.class) - .groupBy(2, in -> in % 2) // the first parameter sets max number of substreams - .map(subIn -> + 3) - .concatSubstreams(); - //#group-flatten - - final int maxDistinctWords = 1000; - //#group-fold - Flow.of(String.class) - .groupBy(maxDistinctWords, i -> i) - .fold(Pair.create("", 0), (pair, word) -> Pair.create(word, pair.second() + 1)) - .mergeSubstreams(); - //#group-fold - - Uri uri = null; - //#raw-query - final Optional theRawQueryString = uri.rawQueryString(); - //#raw-query - - //#query-param - final Optional aQueryParam = uri.query().get("a"); - //#query-param - - //#file-source-sink - final Source> fileSrc = - FileIO.fromFile(new File(".")); - - final Source> otherFileSrc = - FileIO.fromFile(new File("."), 1024); - - final Sink> fileSink = - FileIO.toFile(new File(".")); - //#file-source-sink - - //#input-output-stream-source-sink - final Source> inputStreamSrc = - StreamConverters.fromInputStream((Creator) () -> new SomeInputStream()); - - final Source> otherInputStreamSrc = - StreamConverters.fromInputStream((Creator) () -> new SomeInputStream(), 1024); - - final Sink> outputStreamSink = - StreamConverters.fromOutputStream((Creator) () -> new SomeOutputStream()); - //#input-output-stream-source-sink - - - //#output-input-stream-source-sink - final FiniteDuration timeout = FiniteDuration.Zero(); - - final Source outputStreamSrc = - StreamConverters.asOutputStream(); - - final Source otherOutputStreamSrc = - StreamConverters.asOutputStream(timeout); - - final Sink someInputStreamSink = - StreamConverters.asInputStream(); - - final Sink someOtherInputStreamSink = - StreamConverters.asInputStream(timeout); - //#output-input-stream-source-sink - + //#expand-continually + Flow.of(Integer.class).expand(in -> Stream.iterate(in, i -> i).iterator()); + //#expand-continually + //#expand-state + Flow.of(Integer.class).expand(in -> + Stream.iterate(new Pair<>(in, 0), + p -> new Pair<>(in, p.second() + 1)).iterator()); + //#expand-state } } \ No newline at end of file diff --git a/akka-docs/rst/java/stream/index.rst b/akka-docs/rst/java/stream/index.rst index be5bc8f854..75458af8f9 100644 --- a/akka-docs/rst/java/stream/index.rst +++ b/akka-docs/rst/java/stream/index.rst @@ -23,3 +23,4 @@ Streams stream-cookbook ../../general/stream/stream-configuration migration-guide-1.0-2.x-java + migration-guide-2.0-2.4-java diff --git a/akka-docs/rst/java/stream/migration-guide-1.0-2.x-java.rst b/akka-docs/rst/java/stream/migration-guide-1.0-2.x-java.rst index 11a72ec1da..6b3001914a 100644 --- a/akka-docs/rst/java/stream/migration-guide-1.0-2.x-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-1.0-2.x-java.rst @@ -4,722 +4,6 @@ Migration Guide 1.0 to 2.x ########################## -The 2.0 release contains some structural changes that require some -simple, mechanical source-level changes in client code. While these are detailed below, -there is another change that may have an impact on the runtime behavior of your streams -and which therefore is listed first. +For this migration guide see `the documentation for Akka Streams 2.0`_. -Operator Fusion is on by default -================================ - -Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that -the processing steps of a flow or stream graph can be executed within the same Actor and has three -consequences: - - * starting up a stream may take longer than before due to executing the fusion algorithm - * passing elements from one processing stage to the next is a lot faster between fused - stages due to avoiding the asynchronous messaging overhead - * fused stream processing stages do no longer run in parallel to each other, meaning that - only up to one CPU core is used for each fused part - -The first point can be countered by pre-fusing and then reusing a stream blueprint, see ``akka.stream.Fusing``. -In order to balance the effects of the second and third bullet points you will have to insert asynchronous -boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that -shall communicate with the rest of the graph in an asynchronous fashion. - -.. warning:: - - Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer - that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers - may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer - there, data elements are passed without buffering between fused stages. In those cases where buffering - is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the - ``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function. - -The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``. -In that case you can still manually fuse those graphs which shall run on less Actors. Fusable elements are - - * all GraphStages (this includes all built-in junctions apart from ``groupBy``) - * all Stages (this includes all built-in linear operators) - * TCP connections - -Introduced proper named constructor methods instead of ``wrap()`` -================================================================= - -There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of -the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing -the number of overloads) and finding relevant methods in the documentation. - -Creating a Flow from other stages ---------------------------------- - -It was possible to create a ``Flow`` from a graph with the correct shape (``FlowShape``) using ``wrap()``. Now this -must be done with the more descriptive method ``Flow.fromGraph()``. - -It was possible to create a ``Flow`` from a ``Source`` and a ``Sink`` using ``wrap()``. Now this functionality can -be accessed trough the more descriptive methods ``Flow.fromSinkAndSource`` and ``Flow.fromSinkAndSourceMat``. - - -Creating a BidiFlow from other stages -------------------------------------- - -It was possible to create a ``BidiFlow`` from a graph with the correct shape (``BidiShape``) using ``wrap()``. Now this -must be done with the more descriptive method ``BidiFlow.fromGraph()``. - -It was possible to create a ``BidiFlow`` from two ``Flow`` s using ``wrap()``. Now this functionality can -be accessed trough the more descriptive methods ``BidiFlow.fromFlows`` and ``BidiFlow.fromFlowsMat``. - -Update procedure ----------------- - -1. Replace all uses of ``Flow.wrap`` when it converts a ``Graph`` to a ``Flow`` with ``Flow.fromGraph`` -2. Replace all uses of ``Flow.wrap`` when it converts a ``Source`` and ``Sink`` to a ``Flow`` with - ``Flow.fromSinkAndSource`` or ``Flow.fromSinkAndSourceMat`` -3. Replace all uses of ``BidiFlow.wrap`` when it converts a ``Graph`` to a ``BidiFlow`` with ``BidiFlow.fromGraph`` -4. Replace all uses of ``BidiFlow.wrap`` when it converts two ``Flow`` s to a ``BidiFlow`` with - ``BidiFlow.fromFlows`` or ``BidiFlow.fromFlowsMat`` -5. Replace all uses of ``BidiFlow.apply()`` (Scala DSL) or ``BidiFlow.create()`` (Java DSL) when it converts two - functions to a ``BidiFlow`` with ``BidiFlow.fromFunctions`` - -Example -^^^^^^^ - -:: - - Graph, BoxedUnit> graphSource = ...; - // This no longer works! - Source source = Source.wrap(graphSource); - - Graph, BoxedUnit> graphSink = ...; - // This no longer works! - Sink sink = Sink.wrap(graphSink); - - Graph, BoxedUnit> graphFlow = ...; - // This no longer works! - Flow flow = Flow.wrap(graphFlow); - - // This no longer works! - Flow.wrap(Sink.head(), Source.single(0), Keep.left()); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#flow-wrap - -and - -:: - - Graph, BoxedUnit> bidiGraph = ...; - // This no longer works! - BidiFlow bidiFlow = BidiFlow.wrap(bidiGraph); - - // This no longer works! - BidiFlow.wrap(flow1, flow2, Keep.both()); - - -Should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#bidi-wrap - - -Renamed ``inlet()`` and ``outlet()`` to ``in()`` and ``out()`` in ``SourceShape``, ``SinkShape`` and ``FlowShape`` -================================================================================================================== - -The input and output ports of these shapes where called ``inlet()`` and ``outlet()`` compared to other shapes that -consistently used ``in()`` and ``out()``. Now all :class:`Shape` s use ``in()`` and ``out()``. - -Update procedure ----------------- - -Change all references to ``inlet()`` to ``in()`` and all references to ``outlet()`` to ``out()`` when referring to the ports -of :class:`FlowShape`, :class:`SourceShape` and :class:`SinkShape`. - - -FlowGraph class and builder methods have been renamed -===================================================== - -Due to incorrect overlap with the :class:`Flow` concept we renamed the :class:`FlowGraph` class to :class:`GraphDSL`. -There is now only one graph creation method called ``create`` which is analogous to the old ``partial`` method. For -closed graphs now it is explicitly required to return ``ClosedShape`` at the end of the builder block. - -Update procedure ----------------- - -1. Search and replace all occurrences of ``FlowGraph`` with ``GraphDSL``. -2. Replace all occurrences of ``GraphDSL.partial()`` or ``GraphDSL.closed()`` with ``GraphDSL.create()``. -3. Add ``ClosedShape`` as a return value of the builder block if it was ``FlowGraph.closed()`` before. -4. Wrap the closed graph with ``RunnableGraph.fromGraph`` if it was ``FlowGraph.closed()`` before. - -Example -^^^^^^^ - -:: - - // This no longer works! - FlowGraph.factory().closed(builder -> { - //... - }); - - // This no longer works! - FlowGraph.factory().partial(builder -> { - //... - return new FlowShape<>(inlet, outlet); - }); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#graph-create - -Methods that create Source, Sink, Flow from Graphs have been removed -==================================================================== - -Previously there were convenience methods available on ``Sink``, ``Source``, ``Flow`` an ``BidiFlow`` to create -these DSL elements from a graph builder directly. Now this requires two explicit steps to reduce the number of overloaded -methods (helps Java 8 type inference) and also reduces the ways how these elements can be created. There is only one -graph creation method to learn (``GraphDSL.create``) and then there is only one conversion method to use ``fromGraph()``. - -This means that the following methods have been removed: - - ``adapt()`` method on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (both DSLs) - - ``apply()`` overloads providing a graph ``Builder`` on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (Scala DSL) - - ``create()`` overloads providing a graph ``Builder`` on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (Java DSL) - -Update procedure ----------------- - -Everywhere where ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` is created from a graph using a builder have to -be replaced with two steps - -1. Create a ``Graph`` with the correct ``Shape`` using ``GraphDSL.create`` (e.g.. for ``Source`` it means first - creating a ``Graph`` with ``SourceShape``) -2. Create the required DSL element by calling ``fromGraph()`` on the required DSL element (e.g. ``Source.fromGraph``) - passing the graph created in the previous step - -Example -^^^^^^^ - -:: - - // This no longer works! - Source.factory().create(builder -> { - //... - return outlet; - }); - - // This no longer works! - Sink.factory().create(builder -> { - //... - return inlet; - }); - - // This no longer works! - Flow.factory().create(builder -> { - //... - return new Pair<>(inlet, outlet); - }); - - // This no longer works! - BidiFlow.factory().create(builder -> { - //... - return new BidiShape<>(inlet1, outlet1, inlet2, outlet2); - }); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#graph-create-2 - -Some graph Builder methods have been removed -============================================ - -Due to the high number of overloads Java 8 type inference suffered, and it was also hard to figure out which time -to use which method. Therefore various redundant methods have been removed. As a consequence, every ``Sink``, ``Source`` -and ``Flow`` needs to be explicitly added via ``builder.add()``. - -Update procedure ----------------- - -1. All uses of ``builder.edge(outlet,inlet)`` should be replaced by the alternative ``builder.from(outlet).toInlet(inlet)`` -3. All uses of ``builder.source`` should be replaced by ``builder.from(builder.add(source))`` -4. All uses of ``builder.flow`` should be replaced by ``builder.….via(builder.add(flow))`` -5. All uses of ``builder.sink`` should be replaced by ``builder.….to(builder.add(sink)))`` - -:: - - FlowGraph.factory().closed(builder -> { - // These no longer work - builder.edge(outlet, inlet); - builder.flow(outlet, flow, inlet); - builder.source(Source.single(0)); - builder.sink(Sink.head()); - //... - }); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#graph-builder - -Source constructor name changes -=============================== - -``Source.lazyEmpty`` has been replaced by ``Source.maybe`` which returns a ``Promise`` that can be completed by one or -zero elements by providing an ``Option``. This is different from ``lazyEmpty`` which only allowed completion to be -sent, but no elements. - -The ``from()`` overload on ``Source`` has been refactored to separate methods to reduce the number of overloads and -make source creation more discoverable. - -``Source.subscriber`` has been renamed to ``Source.asSubscriber``. - -Update procedure ----------------- - -1. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with - a ``None`` (an empty ``Option``) -2. Replace all uses of ``Source.from(delay,interval,tick)`` with the method ``Source.tick(delay,interval,tick)`` -3. Replace all uses of ``Source.from(publisher)`` with the method ``Source.fromPublisher(publisher)`` -4. Replace all uses of ``Source.from(future)`` with the method ``Source.fromFuture(future))`` -5. Replace all uses of ``Source.subscriber`` with the method ``Source.asSubscriber`` - -Example -^^^^^^^ - -:: - - // This no longer works! - Source> src = Source.lazyEmpty(); - //... - promise.trySuccess(BoxedUnit.UNIT); - - // This no longer works! - final Source ticks = Source.from( - FiniteDuration.create(0, TimeUnit.MILLISECONDS), - FiniteDuration.create(200, TimeUnit.MILLISECONDS), - "tick"); - - // This no longer works! - final Source pubSource = - Source.from(TestPublisher.manualProbe(true, sys)); - - // This no longer works! - final Source futSource = - Source.from(Futures.successful(42)); - - // This no longer works! - final Source> subSource = - Source.subscriber(); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#source-creators - -Sink constructor name changes -============================= - -``Sink.create(subscriber)`` has been renamed to ``Sink.fromSubscriber(subscriber)`` to reduce the number of overloads and -make sink creation more discoverable. - -Update procedure ----------------- - -1. Replace all uses of ``Sink.create(subscriber)`` with the method ``Sink.fromSubscriber(subscriber)`` - -Example -^^^^^^^ - -:: - - // This no longer works! - final Sink subSink = - Sink.create(TestSubscriber.manualProbe(sys)); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#sink-creators - -``Flow.empty()`` have been removed -================================== - -The ``empty()`` method has been removed since it behaves exactly the same as ``create()``, creating a ``Flow`` with no -transformations added yet. - -Update procedure ----------------- - -1. Replace all uses of ``Flow.empty()`` with ``Flow.create``. - -:: - - // This no longer works! - Flow emptyFlow = Flow.empty(); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#empty-flow - -``flatten(FlattenStrategy)`` has been replaced by named counterparts -==================================================================== - -To simplify type inference in Java 8 and to make the method more discoverable, ``flatten(FlattenStrategy.concat)`` -has been removed and replaced with the alternative method ``flatMapConcat(f)``. - -Update procedure ----------------- - -1. Replace all occurrences of ``flatten(FlattenStrategy.concat)`` with ``flatMapConcat(identity)`` -2. Consider replacing ``map(f).flatMapConcat(identity)`` with ``flatMapConcat(f)`` - -Example -^^^^^^^ - -:: - - Flow.>create().flatten(FlattenStrategy.concat()); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#flatMapConcat - -`Sink.fanoutPublisher() and Sink.publisher() is now a single method` -==================================================================== - -It was a common user mistake to use ``Sink.publisher`` and get into trouble since it would only support -a single ``Subscriber``, and the discoverability of the apprpriate fix was non-obvious (Sink.fanoutPublisher). -To make the decision whether to support fanout or not an active one, the aforementioned methods have been -replaced with a single method: ``Sink.asPublisher(fanout: Boolean)``. - -Update procedure ----------------- - -1. Replace all occurrences of ``Sink.publisher`` with ``Sink.asPublisher(false)`` -2. Replace all occurrences of ``Sink.fanoutPublisher`` with ``Sink.asPublisher(true)`` - -Example -^^^^^^^ - -:: - - // This no longer works! - final Sink> pubSink = - Sink.publisher(); - - // This no longer works! - final Sink> pubSink = - Sink.fanoutPublisher(2, 8); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#sink-as-publisher - -FlexiMerge an FlexiRoute has been replaced by GraphStage -======================================================== - -The ``FlexiMerge`` and ``FlexiRoute`` DSLs have been removed since they provided an abstraction that was too limiting -and a better abstraction have been created which is called ``GraphStage``. ``GraphStage`` can express fan-in and -fan-out stages, but many other constructs as well with possibly multiple input and output ports (e.g. a ``BidiStage``). - -This new abstraction provides a more uniform way to crate custom stream processing stages of arbitrary ``Shape``. In -fact, all of the built-in fan-in and fan-out stages are now implemented in terms of ``GraphStage``. - -Update procedure ----------------- - -*There is no simple update procedure. The affected stages must be ported to the new ``GraphStage`` DSL manually. Please -read the* ``GraphStage`` *documentation (TODO) for details.* - -GroupBy, SplitWhen and SplitAfter now return SubFlow or SubSource -================================================================= - -Previously the ``groupBy``, ``splitWhen``, and ``splitAfter`` -combinators returned a type that included a :class:`Source` within its -elements. Transforming these substreams was only possible by nesting -the respective combinators inside a ``map`` of the outer stream. -While this design enabled maximum flexibility for handling substreams, -it ultimately made it too easy to create a (potentially suprising) -deadlock. You can read more in `SubFlow-Motivation-Thread`_. - -These operations have been made more convenient and also safer by -dropping down into transforming the substreams instead: the return -type is now a :class:`SubFlow` that does not implement the -:class:`Graph` interface and therefore only represents an unfinished -intermediate builder step. The substream mode can be ended by closing -the substreams (i.e. attaching a :class:`Sink`) or merging them back -together. - -.. _SubFlow-Motivation-Thread: https://groups.google.com/d/msg/akka-user/_blLOcIHxJ4/i1DOoylmEgAJ - -Update Procedure ----------------- - -The transformations that were done on the substreams need to be lifted -up one level. This only works for cases where the processing topology -is homogenous for all substreams. If your substream processing -topology is heterogeneous, consider creating a graph (see -:ref:`stream-graph-java`). - -Example -^^^^^^^ - -:: - - Flow. create() - // This no longer works! - .groupBy(i -> i % 2) - // This no longer works! - .map(pair -> pair.second().map(i -> i + 3)) - // This no longer works! - .flatten(FlattenStrategy.concat()) - -This is implemented now as - -.. includecode:: ../code/docs/stream/MigrationsJava.java#group-flatten - -Example 2 -^^^^^^^^^ - -:: - - Flow. create() - // This no longer works! - .groupBy(i -> i) - // This no longer works! - .map(pair -> - pair.second().runFold(new Pair<>(pair.first(), 0), - (pair, word) -> new Pair<>(word, pair.second() + 1))) - // This no longer works! - .mapAsyncUnordered(4, i -> i) - -This is implemented now as - -.. includecode:: ../code/docs/stream/MigrationsJava.java#group-fold - -Semantic change in ``isHoldingUpstream`` in the DetachedStage DSL -================================================================= - -The ``isHoldingUpstream`` method used to return true if the upstream port was in holding state and a completion arrived -(inside the ``onUpstreamFinished`` callback). Now it returns ``false`` when the upstream is completed. - -Update procedure ----------------- - -1. Those stages that relied on the previous behavior need to introduce an extra ``Boolean`` field with initial value - ``false`` -2. This field must be set on every call to ``holdUpstream()`` (and variants). -3. In completion, instead of calling ``isHoldingUpstream`` read this variable instead. - -See the example in the AsyncStage migration section for an example of this procedure. - -StatefulStage has been replaced by GraphStage -============================================= - -The :class:`StatefulStage` class had some flaws and limitations, most notably around completion handling which -caused subtle bugs. The new :class:`GraphStage` (:ref:`graphstage-java`) solves these issues and should be used -instead. - -Update procedure ----------------- - -There is no mechanical update procedure available. Please consult the :class:`GraphStage` documentation -(:ref:`graphstage-java`). - - -AsyncStage has been replaced by GraphStage -========================================== - -Due to its complexity and inflexibility ``AsyncStage`` have been removed in favor of ``GraphStage``. Existing -``AsyncStage`` implementations can be ported in a mostly mechanical way. - -Update procedure ----------------- - -1. The subclass of ``AsyncStage`` should be replaced by ``GraphStage`` -2. The new subclass must define an ``in`` and ``out`` port (``Inlet`` and ``Outlet`` instance) and override the ``shape`` - method returning a ``FlowShape`` -3. An instance of ``GraphStageLogic`` must be returned by overriding ``createLogic()``. The original processing logic and - state will be encapsulated in this ``GraphStageLogic`` -4. Using ``setHandler(port, handler)`` and ``InHandler`` instance should be set on ``in`` and an ``OutHandler`` should - be set on ``out`` -5. ``onPush``, ``onUpstreamFinished`` and ``onUpstreamFailed`` are now available in the ``InHandler`` subclass created - by the user -6. ``onPull`` and ``onDownstreamFinished`` are now available in the ``OutHandler`` subclass created by the user -7. the callbacks above no longer take an extra `ctxt` context parameter. -8. ``onPull`` only signals the stage, the actual element can be obtained by calling ``grab(in)`` -9. ``ctx.push(elem)`` is now ``push(out, elem)`` -10. ``ctx.pull()`` is now ``pull(in)`` -11. ``ctx.finish()`` is now ``completeStage()`` -12. ``ctx.pushAndFinish(elem)`` is now simply two calls: ``push(out, elem); completeStage()`` -13. ``ctx.fail(cause)`` is now ``failStage(cause)`` -14. ``ctx.isFinishing()`` is now ``isClosed(in)`` -15. ``ctx.absorbTermination()`` can be replaced with ``if (isAvailable(shape.outlet)) `` -16. ``ctx.pushAndPull(elem)`` can be replaced with ``push(out, elem); pull(in)`` -17. ``ctx.holdUpstreamAndPush`` and ``context.holdDownstreamAndPull`` can be replaced by simply ``push(elem)`` and - ``pull()`` respectively -18. The following calls should be removed: ``ctx.ignore()``, ``ctx.holdUpstream()`` and ``ctx.holdDownstream()``. -19. ``ctx.isHoldingUpstream()`` can be replaced with ``isAvailable(out)`` -20. ``ctx.isHoldingDowntream()`` can be replaced with ``!(isClosed(in) || hasBeenPulled(in))`` -21. ``ctx.getAsyncCallback()`` is now ``getAsyncCallback(callback)`` which now takes a callback as a parameter. This - would correspond to the ``onAsyncInput()`` callback in the original ``AsyncStage`` - -We show the necessary steps in terms of an example ``AsyncStage`` - -Example -^^^^^^^ - -TODO - -Akka HTTP: Uri parsing mode relaxed-with-raw-query replaced with rawQueryString -=============================================================================== - -Previously Akka HTTP allowed to configure the parsing mode of an Uri's Query part (``?a=b&c=d``) to ``relaxed-with-raw-query`` -which is useful when Uris are not formatted using the usual "key/value pairs" syntax. - -Instead of exposing it as an option for the parser, this is now available as the ``Option rawQueryString()`` -/ ``Option queryString()`` methods on on ``model.Uri``. - -For parsing the Query part use ``Query query(Charset charset, Uri.ParsingMode mode)``. - -Update procedure ----------------- -1. If the ``uri-parsing-mode`` was set to ``relaxed-with-raw-query``, remove it -2. In places where the query string was accessed in ``relaxed-with-raw-query`` mode, use the ``rawQueryString``/``queryString`` methods instead -3. In places where the parsed query parts (such as ``parameter``) were used, invoke parsing directly using ``uri.query().get("a")`` - -Example -^^^^^^^ - -:: - - // config, no longer works - akka.http.parsing.uri-parsing-mode = relaxed-with-raw-query - -should be replaced by: - -.. includecode:: ../code/docs/stream/MigrationsJava.java#raw-query - -And use of query parameters from ``Uri`` that looked like this: - -:: - - // This no longer works! - uri.parameter("name"); - -should be replaced by: - -.. includecode:: ../code/docs/stream/MigrationsJava.java#query-param - -SynchronousFileSource and SynchronousFileSink -============================================= - -Both have been replaced by ``FileIO.toFile(…)`` and ``FileIO.fromFile(…)`` due to discoverability issues -paired with names which leaked internal implementation details. - -Update procedure ----------------- - -Replace ``SynchronousFileSource.create(`` with ``FileIO.fromFile(`` - -Replace ``SynchronousFileSink.create(`` with ``FileIO.toFile(`` - -Replace ``SynchronousFileSink.appendTo(f)`` with ``FileIO.toFile(f, true)`` - -Example -^^^^^^^ - -:: - - // This no longer works! - final Source> src = - SynchronousFileSource.create(new File(".")); - - // This no longer works! - final Source> src = - SynchronousFileSource.create(new File("."), 1024); - - // This no longer works! - final Sink> sink = - `SynchronousFileSink.appendTo(new File(".")); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#file-source-sink - -InputStreamSource and OutputStreamSink -====================================== - -Both have been replaced by ``StreamConverters.fromInputStream(…)`` and ``StreamConverters.fromOutputStream(…)`` due to discoverability issues. - -Update procedure ----------------- - -Replace ``InputStreamSource.create(`` with ``StreamConverters.fromInputStream(`` - -Replace ``OutputStreamSink.create(`` with ``StreamConverters.fromOutputStream(`` - -Example -^^^^^^^ - -:: - - // This no longer works! - final Source> inputStreamSrc = - InputStreamSource.create(new Creator(){ - public InputStream create() { - return new SomeInputStream(); - } - }); - - // This no longer works! - final Source> otherInputStreamSrc = - InputStreamSource.create(new Creator(){ - public InputStream create() { - return new SomeInputStream(); - } - }, 1024); - - // This no longer works! - final Sink> outputStreamSink = - OutputStreamSink.create(new Creator(){ - public OutputStream create() { - return new SomeOutputStream(); - } - }) - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#input-output-stream-source-sink - - -OutputStreamSource and InputStreamSink -====================================== - -Both have been replaced by ``StreamConverters.asOutputStream(…)`` and ``StreamConverters.asInputStream(…)`` due to discoverability issues. - -Update procedure ----------------- - -Replace ``OutputStreamSource.create(`` with ``StreamConverters.asOutputStream(`` - -Replace ``InputStreamSink.create(`` with ``StreamConverters.asInputStream(`` - -Example -^^^^^^^ - -:: - - // This no longer works! - final Source outputStreamSrc = - OutputStreamSource.create(); - - // This no longer works! - final Source otherOutputStreamSrc = - OutputStreamSource.create(timeout); - - // This no longer works! - final Sink someInputStreamSink = - InputStreamSink.create(); - - // This no longer works! - final Sink someOtherInputStreamSink = - InputStreamSink.create(timeout); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsJava.java#output-input-stream-source-sink \ No newline at end of file +.. _`the documentation for Akka Streams 2.0`: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/java/migration-guide-1.0-2.x-java.html diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst new file mode 100644 index 0000000000..bcb5b2db83 --- /dev/null +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -0,0 +1,75 @@ +.. _migration-streams-2.0-2.4-java: + +############################## +Migration Guide 2.0.x to 2.4.x +############################## + +General notes +============= + +akka.Done and akka.NotUsed replacing Unit and BoxedUnit +------------------------------------------------------- + +To provide more clear signatures and have a unified API for both +Java and Scala two new types have been introduced: + +``akka.NotUsed`` is meant to be used instead of ``Unit`` in Scala +and ``BoxedUnit`` in Java to signify that the type parameter is required +but not actually used. This is commonly the case with ``Source``, ``Flow`` and ``Sink`` +that do not materialize into any value. + +``akka.Done`` is added for the use case where it is boxed inside another object to signify +completion but there is no actual value attached to the completion. It is used to replace +occurrences of ``Future`` with ``Future`` in Java and ``Future[Unit]`` with +``Future[Done]`` in Scala. + +All previous usage of ``Unit`` and ``BoxedUnit`` for these two cases in the akka streams APIs +has been updated. + +This means that Java code like this:: + + Source source = Source.from(Arrays.asList("1", "2", "3")); + Sink> sink = Sink.ignore(); + +needs to be changed into:: + + Source source = Source.from(Arrays.asList("1", "2", "3")); + Sink> sink = Sink.ignore(); + +These changes apply to all the places where streams are used, which means that signatures +in the persistent query APIs also are affected. + +Changed Operators +================= + +``expand()`` is now based on an Iterator +---------------------------------------- + +Previously the ``expand`` combinator required two functions as input: the first +one lifted incoming values into an extrapolation state and the second one +extracted values from that, possibly evolving that state. This has been +simplified into a single function that turns the incoming element into an +Iterator. + +The most prominent use-case previously was to just repeat the previously received value:: + + // This no longer works! + Flow.of(Integer.class).expand(i -> i)(i -> new Pair<>(i, i)); + +In Akka 2.4.x this is simplified to: + +.. includecode:: ../code/docs/stream/MigrationsJava.java#expand-continually + +If state needs to be be kept during the expansion process then this state will +need to be managed by the Iterator. The example of counting the number of +expansions might previously have looked like:: + + // This no longer works! + Flow.of(Integer.class).expand(i -> new Pair<>(i, 0))( + pair -> new Pair<>(new Pair<>(pair.first(), pair.second()), + new Pair<>(pair.first(), pair.second() + 1))); + +In Akka 2.4.x this is formulated like so: + +.. includecode:: ../code/docs/stream/MigrationsJava.java#expand-state + diff --git a/akka-docs/rst/project/migration-guide-streams-2.0.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-streams-2.0.x-2.4.x.rst deleted file mode 100644 index 7c4691143e..0000000000 --- a/akka-docs/rst/project/migration-guide-streams-2.0.x-2.4.x.rst +++ /dev/null @@ -1,52 +0,0 @@ -.. _migration-streams-2.0.x-2.4.x: - -########################################### -Migration Guide Akka Streams 2.0.x to 2.4.x -########################################### - -General notes -============= - - - -akka.Done and akka.NotUsed replacing Unit and BoxedUnit -------------------------------------------------------- -To provide more clear signatures and have a unified API for both -Java and Scala two new types have been introduced: - -``akka.NotUsed`` is meant to be used instead of ``Unit`` in Scala -and ``BoxedUnit`` in Java to signify that the type parameter is required -but not actually used. This is commonly the case with ``Source``s, ``Flow``s and ``Sink``s -that do not materialize into any value. - -``akka.Done`` is added for the use case where it is boxed inside another object to signify -completion but there is no actual value attached to the completion. It is used to replace -occurrences of ``Future`` with ``Future`` in Java and ``Future[Unit]`` with - ``Future[Done]`` in Scala. - -All previous usage of ``Unit`` and ``BoxedUnit`` for these two cases in the akka streams APIs -has been updated. - -This means that Java code like this:: - - Source source = Source.from(Arrays.asList("1", "2", "3")); - Sink> sink = Sink.ignore() - -needs to be changed into:: - - Source source = Source.from(Arrays.asList("1", "2", "3")); - Sink> sink = Sink.ignore() - -And Scala code like this:: - - Source[Int, Unit] source = Source.from(1 to 5) - Sink[Int, Future[Unit]] sink = Sink.ignore() - - -needs to be changed into:: - - Source[Int, NotUsed] source = Source.from(1 to 5) - Sink[Int, Future[Done]] sink = Sink.ignore() - -These changes apply to all the places where streams are used, which means that signatures -in the persistent query APIs also are affected. diff --git a/akka-docs/rst/project/migration-guides.rst b/akka-docs/rst/project/migration-guides.rst index 441beea1ed..d7a7b031eb 100644 --- a/akka-docs/rst/project/migration-guides.rst +++ b/akka-docs/rst/project/migration-guides.rst @@ -14,4 +14,3 @@ Migration Guides migration-guide-eventsourced-2.3.x migration-guide-2.3.x-2.4.x migration-guide-2.4.x-2.5.x - migration-streams-2.0.x-2.4.x diff --git a/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala b/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala index 6cba14852e..4cb93c0288 100644 --- a/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala +++ b/akka-docs/rst/scala/code/docs/stream/MigrationsScala.scala @@ -3,280 +3,26 @@ */ package docs.stream -import java.io.File - -import _root_.akka.http.scaladsl.model.Uri -import _root_.akka.stream._ -import _root_.akka.stream.scaladsl._ -import _root_.akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import _root_.akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } - -import scala.concurrent.duration._ -import scala.concurrent.{ ExecutionContext, Future, Promise } -import scala.util.{ Failure, Random, Success, Try } +import akka.stream.scaladsl._ +import akka.stream.testkit.AkkaSpec class MigrationsScala extends AkkaSpec { "Examples in migration guide" must { "compile" in { - val flow1 = Flow[Int] - val flow2 = Flow[Int] - - def inlet: Inlet[Int] = ??? - def outlet: Outlet[Int] = ??? - - def inlet1: Inlet[Int] = ??? - def outlet1: Outlet[Int] = ??? - def inlet2: Inlet[Int] = ??? - def outlet2: Outlet[Int] = ??? - lazy val dontExecuteMe = { - //#flow-wrap - val graphSource: Graph[SourceShape[Int], Unit] = ??? - val source: Source[Int, Unit] = Source.fromGraph(graphSource) - - val graphSink: Graph[SinkShape[Int], Unit] = ??? - val sink: Sink[Int, Unit] = Sink.fromGraph(graphSink) - - val graphFlow: Graph[FlowShape[Int, Int], Unit] = ??? - val flow: Flow[Int, Int, Unit] = Flow.fromGraph(graphFlow) - - Flow.fromSinkAndSource(Sink.head[Int], Source.single(0)) - //#flow-wrap - - //#bidiflow-wrap - val bidiGraph: Graph[BidiShape[Int, Int, Int, Int], Unit] = ??? - val bidi: BidiFlow[Int, Int, Int, Int, Unit] = BidiFlow.fromGraph(bidiGraph) - - BidiFlow.fromFlows(flow1, flow2) - - BidiFlow.fromFunctions((x: Int) => x + 1, (y: Int) => y * 3) - //#bidiflow-wrap - - //#graph-create - // Replaces GraphDSL.closed() - GraphDSL.create() { builder => - //... - ClosedShape - } - - // Replaces GraphDSL.partial() - GraphDSL.create() { builder => - //... - FlowShape(inlet, outlet) - } - //#graph-create - - //#graph-create-2 - Source.fromGraph( - GraphDSL.create() { builder => - //... - SourceShape(outlet) + //#expand-continually + Flow[Int].expand(Iterator.continually(_)) + //#expand-continually + //#expand-state + Flow[Int].expand(i => { + var state = 0 + Iterator.continually({ + state += 1 + (i, state) }) - - Sink.fromGraph( - GraphDSL.create() { builder => - //... - SinkShape(inlet) - }) - - Flow.fromGraph( - GraphDSL.create() { builder => - //... - FlowShape(inlet, outlet) - }) - - BidiFlow.fromGraph( - GraphDSL.create() { builder => - //... - BidiShape(inlet1, outlet1, inlet2, outlet2) - }) - //#graph-create-2 - - //#graph-edges - RunnableGraph.fromGraph( - GraphDSL.create() { implicit builder => - import GraphDSL.Implicits._ - outlet ~> inlet - outlet ~> flow ~> inlet - //... - ClosedShape - }) - //#graph-edges - - val promise = Promise[Unit]() - - //#source-creators - val src: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] - //... - // This finishes the stream without emitting anything, just like Source.lazyEmpty did - promise.trySuccess(Some(())) - - val ticks = Source.tick(1.second, 3.seconds, "tick") - - val pubSource = Source.fromPublisher(TestPublisher.manualProbe[Int]()) - - val itSource = Source.fromIterator(() => Iterator.continually(Random.nextGaussian)) - - val futSource = Source.fromFuture(Future.successful(42)) - - val subSource = Source.asSubscriber - //#source-creators - - //#sink-creators - val subSink = Sink.fromSubscriber(TestSubscriber.manualProbe[Int]()) - //#sink-creators - - //#sink-as-publisher - val pubSink = Sink.asPublisher(fanout = false) - - val pubSinkFanout = Sink.asPublisher(fanout = true) - //#sink-as-publisher - - //#flatMapConcat - Flow[Source[Int, Any]].flatMapConcat(identity) - //#flatMapConcat - - //#group-flatten - Flow[Int] - .groupBy(2, _ % 2) // the first parameter sets max number of substreams - .map(_ + 3) - .concatSubstreams - //#group-flatten - - val MaxDistinctWords = 1000 - //#group-fold - Flow[String] - .groupBy(MaxDistinctWords, identity) - .fold(("", 0))((pair, word) => (word, pair._2 + 1)) - .mergeSubstreams - //#group-fold - - //#port-async - class MapAsyncOne[In, Out](f: In ⇒ Future[Out])(implicit ec: ExecutionContext) - extends GraphStage[FlowShape[In, Out]] { - val in: Inlet[In] = Inlet("MapAsyncOne.in") - val out: Outlet[Out] = Outlet("MapAsyncOne.out") - override val shape: FlowShape[In, Out] = FlowShape(in, out) - - // The actual logic is encapsulated in a GraphStageLogic now - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) { - - // All of the state *must* be encapsulated in the GraphStageLogic, - // not in the GraphStage - private var elemInFlight: Out = _ - - val callback = getAsyncCallback(onAsyncInput) - var holdingUpstream = false - - // All upstream related events now are handled in an InHandler instance - setHandler(in, new InHandler { - // No context or element parameter for onPush - override def onPush(): Unit = { - // The element is not passed as an argument but needs to be dequeued explicitly - val elem = grab(in) - val future = f(elem) - future.onComplete(callback.invoke) - // ctx.holdUpstream is no longer needed, but we need to track the state - holdingUpstream = true - } - - // No context parameter - override def onUpstreamFinish(): Unit = { - if (holdingUpstream) absorbTermination() - else completeStage() // ctx.finish turns into completeStage() - } - }) - - setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (elemInFlight != null) { - val e = elemInFlight - elemInFlight = null.asInstanceOf[Out] - pushIt(e) - } // holdDownstream is no longer needed - } - }) - - // absorbTermination turns into the code below. - // This emulates the behavior of the AsyncStage stage. - private def absorbTermination(): Unit = - if (isAvailable(shape.out)) getHandler(out).onPull() - - // The line below emulates the behavior of the AsyncStage holdingDownstream - private def holdingDownstream(): Boolean = - !(isClosed(in) || hasBeenPulled(in)) - - // Any method can be used as a callback, we chose the previous name for - // easier comparison with the original code - private def onAsyncInput(input: Try[Out]) = - input match { - case Failure(ex) ⇒ failStage(ex) - case Success(e) if holdingDownstream() ⇒ pushIt(e) - case Success(e) ⇒ - elemInFlight = e - // ctx.ignore is no longer needed - } - - private def pushIt(elem: Out): Unit = { - // ctx.isFinishing turns into isClosed(in) - if (isClosed(in)) { - // pushAndFinish is now two actions - push(out, elem) - completeStage() - } else { - // pushAndPull is now two actions - push(out, elem) - pull(in) - holdingUpstream = false - } - } - } - - } - - //#port-async - - val uri: Uri = ??? - //#raw-query - val queryPart: Option[String] = uri.rawQueryString - //#raw-query - - //#query-param - val param: Option[String] = uri.query().get("a") - //#query-param - - //#file-source-sink - val fileSrc = FileIO.fromFile(new File(".")) - - val otherFileSrc = FileIO.fromFile(new File("."), 1024) - - val someFileSink = FileIO.toFile(new File(".")) - //#file-source-sink - - class SomeInputStream extends java.io.InputStream { override def read(): Int = 0 } - class SomeOutputStream extends java.io.OutputStream { override def write(b: Int): Unit = () } - - //#input-output-stream-source-sink - val inputStreamSrc = StreamConverters.fromInputStream(() => new SomeInputStream()) - - val otherInputStreamSrc = StreamConverters.fromInputStream(() => new SomeInputStream()) - - val someOutputStreamSink = StreamConverters.fromOutputStream(() => new SomeOutputStream()) - //#input-output-stream-source-sink - - //#output-input-stream-source-sink - val timeout: FiniteDuration = 0.seconds - - val outputStreamSrc = StreamConverters.asOutputStream() - - val otherOutputStreamSrc = StreamConverters.asOutputStream(timeout) - - val someInputStreamSink = StreamConverters.asInputStream() - - val someOtherInputStreamSink = StreamConverters.asInputStream(timeout) - //#output-input-stream-source-sink + }) + //#expand-state } } } diff --git a/akka-docs/rst/scala/stream/index.rst b/akka-docs/rst/scala/stream/index.rst index 60c7b2f8e0..e872c25093 100644 --- a/akka-docs/rst/scala/stream/index.rst +++ b/akka-docs/rst/scala/stream/index.rst @@ -23,3 +23,4 @@ Streams stream-cookbook ../../general/stream/stream-configuration migration-guide-1.0-2.x-scala + migration-guide-2.0-2.4-scala diff --git a/akka-docs/rst/scala/stream/migration-guide-1.0-2.x-scala.rst b/akka-docs/rst/scala/stream/migration-guide-1.0-2.x-scala.rst index 744a87fd96..c48a2c4f81 100644 --- a/akka-docs/rst/scala/stream/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-1.0-2.x-scala.rst @@ -4,746 +4,6 @@ Migration Guide 1.0 to 2.x ########################## -The 2.0 release contains some structural changes that require some -simple, mechanical source-level changes in client code. While these are detailed below, -there is another change that may have an impact on the runtime behavior of your streams -and which therefore is listed first. +For this migration guide see `the documentation for Akka Streams 2.0`_. -Operator Fusion is on by default -================================ - -Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that -the processing steps of a flow or stream graph can be executed within the same Actor and has three -consequences: - - * starting up a stream may take longer than before due to executing the fusion algorithm - * passing elements from one processing stage to the next is a lot faster between fused - stages due to avoiding the asynchronous messaging overhead - * fused stream processing stages do no longer run in parallel to each other, meaning that - only up to one CPU core is used for each fused part - -The first point can be countered by pre-fusing and then reusing a stream blueprint, see ``akka.stream.Fusing``. -In order to balance the effects of the second and third bullet points you will have to insert asynchronous -boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that -shall communicate with the rest of the graph in an asynchronous fashion. - -.. warning:: - - Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer - that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers - may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer - there, data elements are passed without buffering between fused stages. In those cases where buffering - is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the - ``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function. - -The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``. -In that case you can still manually fuse those graphs which shall run on less Actors. Fusable elements are - - * all GraphStages (this includes all built-in junctions apart from ``groupBy``) - * all Stages (this includes all built-in linear operators) - * TCP connections - -Introduced proper named constructor methods instead of ``wrap()`` -================================================================= - -There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of -the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing -the number of overloads) and finding relevant methods in the documentation. - -Creating a Flow from other stages ---------------------------------- - -It was possible to create a ``Flow`` from a graph with the correct shape (``FlowShape``) using ``wrap()``. Now this -must be done with the more descriptive method ``Flow.fromGraph()``. - -It was possible to create a ``Flow`` from a ``Source`` and a ``Sink`` using ``wrap()``. Now this functionality can -be accessed trough the more descriptive methods ``Flow.fromSinkAndSource`` and ``Flow.fromSinkAndSourceMat``. - - -Creating a BidiFlow from other stages -------------------------------------- - -It was possible to create a ``BidiFlow`` from a graph with the correct shape (``BidiShape``) using ``wrap()``. Now this -must be done with the more descriptive method ``BidiFlow.fromGraph()``. - -It was possible to create a ``BidiFlow`` from two ``Flow`` s using ``wrap()``. Now this functionality can -be accessed trough the more descriptive methods ``BidiFlow.fromFlows`` and ``BidiFlow.fromFlowsMat``. - -It was possible to create a ``BidiFlow`` from two functions using ``apply()`` (Scala DSL) or ``create()`` (Java DSL). -Now this functionality can be accessed trough the more descriptive method ``BidiFlow.fromFunctions``. - -Update procedure ----------------- - -1. Replace all uses of ``Flow.wrap`` when it converts a ``Graph`` to a ``Flow`` with ``Flow.fromGraph`` -2. Replace all uses of ``Flow.wrap`` when it converts a ``Source`` and ``Sink`` to a ``Flow`` with - ``Flow.fromSinkAndSource`` or ``Flow.fromSinkAndSourceMat`` -3. Replace all uses of ``BidiFlow.wrap`` when it converts a ``Graph`` to a ``BidiFlow`` with ``BidiFlow.fromGraph`` -4. Replace all uses of ``BidiFlow.wrap`` when it converts two ``Flow`` s to a ``BidiFlow`` with - ``BidiFlow.fromFlows`` or ``BidiFlow.fromFlowsMat`` -5. Replace all uses of ``BidiFlow.apply()`` when it converts two - functions to a ``BidiFlow`` with ``BidiFlow.fromFunctions`` - -Example -^^^^^^^ - -:: - - val graphSource: Graph[SourceShape[Int], Unit] = ??? - // This no longer works! - val source: Source[Int, Unit] = Source.wrap(graphSource) - - val graphSink: Graph[SinkShape[Int], Unit] = ??? - // This no longer works! - val sink: Sink[Int, Unit] = Sink.wrap(graphSink) - - val graphFlow: Graph[FlowShape[Int, Int], Unit] = ??? - // This no longer works! - val flow: Flow[Int, Int, Unit] = Flow.wrap(graphFlow) - - // This no longer works - Flow.wrap(Sink.head[Int], Source.single(0))(Keep.left) - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#flow-wrap - -and - -:: - - val bidiGraph: Graph[BidiShape[Int, Int, Int, Int], Unit = ??? - // This no longer works! - val bidi: BidiFlow[Int, Int, Int, Int, Unit] = BidiFlow.wrap(bidiGraph) - - // This no longer works! - BidiFlow.wrap(flow1, flow2)(Keep.both) - - // This no longer works! - BidiFlow((x: Int) => x + 1, (y: Int) => y * 3) - - -Should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#bidiflow-wrap - -FlowGraph class and builder methods have been renamed -===================================================== - -Due to incorrect overlap with the :class:`Flow` concept we renamed the :class:`FlowGraph` class to :class:`GraphDSL`. -There is now only one graph creation method called ``create`` which is analogous to the old ``partial`` method. For -closed graphs now it is explicitly required to return ``ClosedShape`` at the end of the builder block. - -Update procedure ----------------- - -1. Search and replace all occurrences of ``FlowGraph`` with ``GraphDSL``. -2. Replace all occurrences of ``GraphDSL.partial()`` or ``GraphDSL.closed()`` with ``GraphDSL.create()``. -3. Add ``ClosedShape`` as a return value of the builder block if it was ``FlowGraph.closed()`` before. -4. Wrap the closed graph with ``RunnableGraph.fromGraph`` if it was ``FlowGraph.closed()`` before. - -Example -^^^^^^^ - -:: - - // This no longer works! - FlowGraph.closed() { builder => - //... - } - - // This no longer works! - FlowGraph.partial() { builder => - //... - FlowShape(inlet, outlet) - } - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#graph-create - -Methods that create Source, Sink, Flow from Graphs have been removed -==================================================================== - -Previously there were convenience methods available on ``Sink``, ``Source``, ``Flow`` an ``BidiFlow`` to create -these DSL elements from a graph builder directly. Now this requires two explicit steps to reduce the number of overloaded -methods (helps Java 8 type inference) and also reduces the ways how these elements can be created. There is only one -graph creation method to learn (``GraphDSL.create``) and then there is only one conversion method to use ``fromGraph()``. - -This means that the following methods have been removed: - - ``adapt()`` method on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (both DSLs) - - ``apply()`` overloads providing a graph ``Builder`` on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (Scala DSL) - - ``create()`` overloads providing a graph ``Builder`` on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (Java DSL) - -Update procedure ----------------- - -Everywhere where ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` is created from a graph using a builder have to -be replaced with two steps - -1. Create a ``Graph`` with the correct ``Shape`` using ``GraphDSL.create`` (e.g.. for ``Source`` it means first - creating a ``Graph`` with ``SourceShape``) -2. Create the required DSL element by calling ``fromGraph()`` on the required DSL element (e.g. ``Source.fromGraph``) - passing the graph created in the previous step - -Example -^^^^^^^ - -:: - - // This no longer works! - Source() { builder => - //... - outlet - } - - // This no longer works! - Sink() { builder => - //... - inlet - } - - // This no longer works! - Flow() { builder => - //... - (inlet, outlet) - } - - // This no longer works! - BidiFlow() { builder => - //... - BidiShape(inlet1, outlet1, inlet2, outlet2) - } - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#graph-create-2 - -Several Graph builder methods have been removed -=============================================== - -The ``addEdge`` methods have been removed from the DSL to reduce the ways connections can be made and to reduce the -number of overloads. Now only the ``~>`` notation is available which requires the import of the implicits -``GraphDSL.Implicits._``. - -Update procedure ----------------- - -1. Replace all uses of ``scaladsl.Builder.addEdge(Outlet, Inlet)`` by the graphical DSL ``~>``. -2. Replace all uses of ``scaladsl.Builder.addEdge(Outlet, FlowShape, Inlet)`` by the graphical DSL ``~>``. - methods, or the graphical DSL ``~>``. -3. Import ``FlowGraph.Implicits._`` in the builder block or an enclosing scope. - -Example -^^^^^^^ - -:: - - FlowGraph.closed() { builder => - //... - // This no longer works! - builder.addEdge(outlet, inlet) - // This no longer works! - builder.addEdge(outlet, flow1, inlet) - //... - } - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#graph-edges - -Source constructor name changes -=============================== - -``Source.lazyEmpty`` has been replaced by ``Source.maybe`` which returns a ``Promise`` that can be completed by one or -zero elements by providing an ``Option``. This is different from ``lazyEmpty`` which only allowed completion to be -sent, but no elements. - -The ``apply()`` overload on ``Source`` has been refactored to separate methods to reduce the number of overloads and -make source creation more discoverable. - -``Source.subscriber`` has been renamed to ``Source.asSubscriber``. - -Update procedure ----------------- - -1. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with - a ``None`` (an empty ``Option``) -2. Replace all uses of ``Source(delay,interval,tick)`` with the method ``Source.tick(delay,interval,tick)`` -3. Replace all uses of ``Source(publisher)`` with the method ``Source.fromPublisher(publisher)`` -4. Replace all uses of ``Source(() => iterator)`` with the method ``Source.fromIterator(() => iterator))`` -5. Replace all uses of ``Source(future)`` with the method ``Source.fromFuture(future))`` -6. Replace all uses of ``Source.subscriber`` with the method ``Source.asSubscriber`` - -Example -^^^^^^^ - -:: - - // This no longer works! - val src: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int] - //... - promise.trySuccess(()) - - // This no longer works! - val ticks = Source(1.second, 3.seconds, "tick") - - // This no longer works! - val pubSource = Source(TestPublisher.manualProbe[Int]()) - - // This no longer works! - val itSource = Source(() => Iterator.continually(Random.nextGaussian)) - - // This no longer works! - val futSource = Source(Future.successful(42)) - - // This no longer works! - val subSource = Source.subscriber - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#source-creators - -Sink constructor name changes -============================= - -``Sink.apply(subscriber)`` has been renamed to ``Sink.fromSubscriber(subscriber)`` to reduce the number of overloads and -make sink creation more discoverable. - -Update procedure ----------------- - -1. Replace all uses of ``Sink(subscriber)`` with the method ``Sink.fromSubscriber(subscriber)`` - -Example -^^^^^^^ - -:: - - // This no longer works! - val subSink = Sink(TestSubscriber.manualProbe[Int]()) - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#sink-creators - -``flatten(FlattenStrategy)`` has been replaced by named counterparts -==================================================================== - -To simplify type inference in Java 8 and to make the method more discoverable, ``flatten(FlattenStrategy.concat)`` -has been removed and replaced with the alternative method ``flatten(FlattenStrategy.concat)``. - -Update procedure ----------------- - -1. Replace all occurrences of ``flatten(FlattenStrategy.concat)`` with ``flatMapConcat(identity)`` -2. Consider replacing all occurrences of ``map(f).flatMapConcat(identity)`` with ``flatMapConcat(f)`` - -Example -^^^^^^^ - -:: - - // This no longer works! - Flow[Source[Int, Any]].flatten(FlattenStrategy.concat) - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#flatMapConcat - -`Sink.fanoutPublisher()` and `Sink.publisher()` is now a single method -====================================================================== - -It was a common user mistake to use ``Sink.publisher`` and get into trouble since it would only support -a single ``Subscriber``, and the discoverability of the apprpriate fix was non-obvious (Sink.fanoutPublisher). -To make the decision whether to support fanout or not an active one, the aforementioned methods have been -replaced with a single method: ``Sink.asPublisher(fanout: Boolean)``. - -Update procedure ----------------- - -1. Replace all occurrences of ``Sink.publisher`` with ``Sink.asPublisher(false)`` -2. Replace all occurrences of ``Sink.fanoutPublisher`` with ``Sink.asPublisher(true)`` - -Example -^^^^^^^ - -:: - - // This no longer works! - val subSink = Sink.publisher - - // This no longer works! - val subSink = Sink.fanoutPublisher(2, 8) - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#sink-as-publisher - -FlexiMerge an FlexiRoute has been replaced by GraphStage -======================================================== - -The ``FlexiMerge`` and ``FlexiRoute`` DSLs have been removed since they provided an abstraction that was too limiting -and a better abstraction have been created which is called ``GraphStage``. ``GraphStage`` can express fan-in and -fan-out stages, but many other constructs as well with possibly multiple input and output ports (e.g. a ``BidiStage``). - -This new abstraction provides a more uniform way to crate custom stream processing stages of arbitrary ``Shape``. In -fact, all of the built-in fan-in and fan-out stages are now implemented in terms of ``GraphStage``. - -Update procedure ----------------- - -*There is no simple update procedure. The affected stages must be ported to the new ``GraphStage`` DSL manually. Please -read the* ``GraphStage`` *documentation (TODO) for details.* - -GroupBy, SplitWhen and SplitAfter now return SubFlow -==================================================== - -Previously the ``groupBy``, ``splitWhen``, and ``splitAfter`` -combinators returned a type that included a :class:`Source` within its -elements. Transforming these substreams was only possible by nesting -the respective combinators inside a ``map`` of the outer stream. -While this design enabled maximum flexibility for handling substreams, -it ultimately made it too easy to create a (potentially suprising) -deadlock. You can read more in `SubFlow-Motivation-Thread`_. - -These operations have been made more convenient and also safer by -dropping down into transforming the substreams instead: the return -type is now a :class:`SubFlow` that does not implement the -:class:`Graph` interface and therefore only represents an unfinished -intermediate builder step. The substream mode can be ended by closing -the substreams (i.e. attaching a :class:`Sink`) or merging them back -together. - -.. _SubFlow-Motivation-Thread: https://groups.google.com/d/msg/akka-user/_blLOcIHxJ4/i1DOoylmEgAJ - -Update Procedure ----------------- - -The transformations that were done on the substreams need to be lifted -up one level. This only works for cases where the processing topology -is homogenous for all substreams. If your substream processing -topology is heterogeneous, consider creating a graph (see -:ref:`stream-graph-scala`). - -Example -^^^^^^^ - -:: - - Flow[Int] - // This no longer works! - .groupBy(_ % 2) - // This no longer works! - .map { - case (key, source) => source.map(_ + 3) - } - // This no longer works! - .flatten(FlattenStrategy.concat) - -This is implemented now as - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#group-flatten - -Example 2 -^^^^^^^^^ - -:: - - Flow[String] - // This no longer works! - .groupBy(identity) - // This no longer works! - .map { - case (key, source) => source.runFold((key, 0))((pair, word) => (key, pair._2 + 1)) - } - // This no longer works! - .mapAsyncUnordered(4, identity) - -This is implemented now as - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#group-fold - -Variance of Inlet and Outlet -============================ - -Scala uses *declaration site variance* which was cumbersome in the cases of ``Inlet`` and ``Outlet`` as they are -purely symbolic object containing no fields or methods and which are used both in input and output locations (wiring -an ``Outlet`` into an ``Inlet``; reading in a stage from an ``Inlet``). Because of this reasons all users of these -port abstractions now use *use-site variance* (just like Java variance works). This in general does not affect user -code expect the case of custom shapes, which now require ``@uncheckedVariance`` annotations on their ``Inlet`` and -``Outlet`` members (since these are now invariant, but the Scala compiler does not know that they have no fields or -methods that would violate variance constraints) - -This change does not affect Java DSL users. - -Update procedure ----------------- - -1. All custom shapes must use ``@uncheckedVariance`` on their ``Inlet`` and ``Outlet`` members. - -Renamed ``inlet()`` and ``outlet()`` to ``in()`` and ``out()`` in ``SourceShape``, ``SinkShape`` and ``FlowShape`` -================================================================================================================== - -The input and output ports of these shapes where called ``inlet()`` and ``outlet()`` compared to other shapes that -consistently used ``in()`` and ``out()``. Now all :class:`Shape` s use ``in()`` and ``out()``. - -Update procedure ----------------- - -Change all references to ``inlet()`` to ``in()`` and all references to ``outlet()`` to ``out()`` when referring to the ports -of :class:`FlowShape`, :class:`SourceShape` and :class:`SinkShape`. - -Semantic change in ``isHoldingUpstream`` in the DetachedStage DSL -================================================================= - -The ``isHoldingUpstream`` method used to return true if the upstream port was in holding state and a completion arrived -(inside the ``onUpstreamFinished`` callback). Now it returns ``false`` when the upstream is completed. - -Update procedure ----------------- - -1. Those stages that relied on the previous behavior need to introduce an extra ``Boolean`` field with initial value - ``false`` -2. This field must be set on every call to ``holdUpstream()`` (and variants). -3. In completion, instead of calling ``isHoldingUpstream`` read this variable instead. - -See the example in the AsyncStage migration section for an example of this procedure. - - -StatefulStage has been replaced by GraphStage -============================================= - -The :class:`StatefulStage` class had some flaws and limitations, most notably around completion handling which -caused subtle bugs. The new :class:`GraphStage` (:ref:`graphstage-java`) solves these issues and should be used -instead. - -Update procedure ----------------- - -There is no mechanical update procedure available. Please consult the :class:`GraphStage` documentation -(:ref:`graphstage-java`). - - -AsyncStage has been replaced by GraphStage -========================================== - -Due to its complexity and inflexibility ``AsyncStage`` have been removed in favor of ``GraphStage``. Existing -``AsyncStage`` implementations can be ported in a mostly mechanical way. - -Update procedure ----------------- - -1. The subclass of ``AsyncStage`` should be replaced by ``GraphStage`` -2. The new subclass must define an ``in`` and ``out`` port (``Inlet`` and ``Outlet`` instance) and override the ``shape`` - method returning a ``FlowShape`` -3. An instance of ``GraphStageLogic`` must be returned by overriding ``createLogic()``. The original processing logic and - state will be encapsulated in this ``GraphStageLogic`` -4. Using ``setHandler(port, handler)`` and ``InHandler`` instance should be set on ``in`` and an ``OutHandler`` should - be set on ``out`` -5. ``onPush``, ``onUpstreamFinished`` and ``onUpstreamFailed`` are now available in the ``InHandler`` subclass created - by the user -6. ``onPull`` and ``onDownstreamFinished`` are now available in the ``OutHandler`` subclass created by the user -7. the callbacks above no longer take an extra `ctxt` context parameter. -8. ``onPull`` only signals the stage, the actual element can be obtained by calling ``grab(in)`` -9. ``ctx.push(elem)`` is now ``push(out, elem)`` -10. ``ctx.pull()`` is now ``pull(in)`` -11. ``ctx.finish()`` is now ``completeStage()`` -12. ``ctx.pushAndFinish(elem)`` is now simply two calls: ``push(out, elem); completeStage()`` -13. ``ctx.fail(cause)`` is now ``failStage(cause)`` -14. ``ctx.isFinishing()`` is now ``isClosed(in)`` -15. ``ctx.absorbTermination()`` can be replaced with ``if (isAvailable(shape.outlet)) `` -16. ``ctx.pushAndPull(elem)`` can be replaced with ``push(out, elem); pull(in)`` -17. ``ctx.holdUpstreamAndPush`` and ``context.holdDownstreamAndPull`` can be replaced by simply ``push(elem)`` and - ``pull()`` respectively -18. The following calls should be removed: ``ctx.ignore()``, ``ctx.holdUpstream()`` and ``ctx.holdDownstream()``. -19. ``ctx.isHoldingUpstream()`` can be replaced with ``isAvailable(out)`` -20. ``ctx.isHoldingDowntream()`` can be replaced with ``!(isClosed(in) || hasBeenPulled(in))`` -21. ``ctx.getAsyncCallback()`` is now ``getAsyncCallback(callback)`` which now takes a callback as a parameter. This - would correspond to the ``onAsyncInput()`` callback in the original ``AsyncStage`` - -We show the necessary steps in terms of an example ``AsyncStage`` - -Example -^^^^^^^ - -:: - - class MapAsyncOne[In, Out](f: In ⇒ Future[Out])(implicit ec: ExecutionContext) - extends AsyncStage[In, Out, Try[Out]] { - - private var elemInFlight: Out = _ - - override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]) = { - val future = f(elem) - val cb = ctx.getAsyncCallback - future.onComplete(cb.invoke) - ctx.holdUpstream() - } - - override def onPull(ctx: AsyncContext[Out, Try[Out]]) = - if (elemInFlight != null) { - val e = elemInFlight - elemInFlight = null.asInstanceOf[Out] - pushIt(e, ctx) - } else ctx.holdDownstream() - - override def onAsyncInput(input: Try[Out], ctx: AsyncContext[Out, Try[Out]]) = - input match { - case Failure(ex) ⇒ ctx.fail(ex) - case Success(e) if ctx.isHoldingDownstream ⇒ pushIt(e, ctx) - case Success(e) ⇒ - elemInFlight = e - ctx.ignore() - } - - override def onUpstreamFinish(ctx: AsyncContext[Out, Try[Out]]) = - if (ctx.isHoldingUpstream) ctx.absorbTermination() - else ctx.finish() - - private def pushIt(elem: Out, ctx: AsyncContext[Out, Try[Out]]) = - if (ctx.isFinishing) ctx.pushAndFinish(elem) - else ctx.pushAndPull(elem) - } - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#port-async - -Akka HTTP: Uri parsing mode relaxed-with-raw-query replaced with rawQueryString -=============================================================================== - -Previously Akka HTTP allowed to configure the parsing mode of an Uri's Query part (``?a=b&c=d``) to ``relaxed-with-raw-query`` -which is useful when Uris are not formatted using the usual "key/value pairs" syntax. - -Instead of exposing it as an option for the parser, this is now available as the ``rawQueryString(): Option[String]`` -/ ``queryString(): Option[String]`` methods on on ``model.Uri``. - - -For parsing the Query part use ``query(charset: Charset = UTF8, mode: Uri.ParsingMode = Uri.ParsingMode.Relaxed): Query``. - -Update procedure ----------------- -1. If the ``uri-parsing-mode`` was set to ``relaxed-with-raw-query``, remove it -2. In places where the query string was accessed in ``relaxed-with-raw-query`` mode, use the ``rawQueryString``/``queryString`` methods instead -3. In places where the parsed query parts (such as ``parameter``) were used, invoke parsing directly using ``uri.query().get("a")`` - -Example -^^^^^^^ - -:: - - // config, no longer works - akka.http.parsing.uri-parsing-mode = relaxed-with-raw-query - -should be replaced by: - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#raw-query - -And use of query parameters from ``Uri`` that looked like this: - -:: - - // This no longer works! - uri.parameter("name") - -should be replaced by: - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#query-param - -SynchronousFileSource and SynchronousFileSink -============================================= - - -``SynchronousFileSource`` and ``SynchronousFileSink`` -have been replaced by ``FileIO.read(…)`` and ``FileIO.write(…)`` due to discoverability issues -paired with names which leaked internal implementation details. - -Update procedure ----------------- - -Replace ``SynchronousFileSource(`` and ``SynchronousFileSource.apply(`` with ``FileIO.fromFile(`` - -Replace ``SynchronousFileSink(`` and ``SynchronousFileSink.apply(`` with ``FileIO.toFile(`` - -Example -^^^^^^^ - -:: - - // This no longer works! - val fileSrc = SynchronousFileSource(new File(".")) - - // This no longer works! - val otherFileSrc = SynchronousFileSource(new File("."), 1024) - - // This no longer works! - val someFileSink = SynchronousFileSink(new File(".")) - - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#file-source-sink - -InputStreamSource and OutputStreamSink -====================================== - -Both have been replaced by ``StreamConverters.fromInputStream(…)`` and ``StreamConverters.fromOutputStream(…)`` due to discoverability issues. - -Update procedure ----------------- - -Replace ``InputStreamSource(`` and ``InputStreamSource.apply(`` with ``StreamConverters.fromInputStream(`` -i -Replace ``OutputStreamSink(`` and ``OutputStreamSink.apply(`` with ``StreamConverters.fromOutputStream(`` - -Example -^^^^^^^ - -:: - - // This no longer works! - val inputStreamSrc = InputStreamSource(() => new SomeInputStream()) - - // This no longer works! - val otherInputStreamSrc = InputStreamSource(() => new SomeInputStream(), 1024) - - // This no longer works! - val someOutputStreamSink = OutputStreamSink(() => new SomeOutputStream()) - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#input-output-stream-source-sink - -OutputStreamSource and InputStreamSink -====================================== - -Both have been replaced by ``StreamConverters.asOutputStream(…)`` and ``StreamConverters.asInputStream(…)`` due to discoverability issues. - -Update procedure ----------------- - -Replace ``OutputStreamSource(`` and ``OutputStreamSource.apply(`` with ``StreamConverters.asOutputStream(`` - -Replace ``InputStreamSink(`` and ``InputStreamSink.apply(`` with ``StreamConverters.asInputStream(`` - -Example -^^^^^^^ - -:: - - // This no longer works! - val outputStreamSrc = OutputStreamSource() - - // This no longer works! - val otherOutputStreamSrc = OutputStreamSource(timeout) - - // This no longer works! - val someInputStreamSink = InputStreamSink() - - // This no longer works! - val someOtherInputStreamSink = InputStreamSink(timeout); - -should be replaced by - -.. includecode:: ../code/docs/stream/MigrationsScala.scala#output-input-stream-source-sink +.. _`the documentation for Akka Streams 2.0`: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/migration-guide-1.0-2.x-scala.html diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst new file mode 100644 index 0000000000..561fead548 --- /dev/null +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -0,0 +1,73 @@ +.. _migration-streams-2.0-2.4-scala: + +############################## +Migration Guide 2.0.x to 2.4.x +############################## + +General notes +============= + +akka.Done and akka.NotUsed replacing Unit and BoxedUnit +------------------------------------------------------- + +To provide more clear signatures and have a unified API for both +Java and Scala two new types have been introduced: + +``akka.NotUsed`` is meant to be used instead of ``Unit`` in Scala +and ``BoxedUnit`` in Java to signify that the type parameter is required +but not actually used. This is commonly the case with ``Source``, ``Flow`` and ``Sink`` +that do not materialize into any value. + +``akka.Done`` is added for the use case where it is boxed inside another object to signify +completion but there is no actual value attached to the completion. It is used to replace +occurrences of ``Future`` with ``Future`` in Java and ``Future[Unit]`` with +``Future[Done]`` in Scala. + +All previous usage of ``Unit`` and ``BoxedUnit`` for these two cases in the akka streams APIs +has been updated. + +This means that Scala code like this:: + + Source[Int, Unit] source = Source.from(1 to 5) + Sink[Int, Future[Unit]] sink = Sink.ignore() + + +needs to be changed into:: + + Source[Int, NotUsed] source = Source.from(1 to 5) + Sink[Int, Future[Done]] sink = Sink.ignore() + +These changes apply to all the places where streams are used, which means that signatures +in the persistent query APIs also are affected. + +Changed Operators +================= + +``expand()`` is now based on an Iterator +---------------------------------------- + +Previously the ``expand`` combinator required two functions as input: the first +one lifted incoming values into an extrapolation state and the second one +extracted values from that, possibly evolving that state. This has been +simplified into a single function that turns the incoming element into an +Iterator. + +The most prominent use-case previously was to just repeat the previously received value:: + + Flow[Int].expand(identity)(s => (s, s)) // This no longer works! + +In Akka 2.4.x this is simplified to: + +.. includecode:: ../code/docs/stream/MigrationsScala.scala#expand-continually + +If state needs to be be kept during the expansion process then this state will +need to be managed by the Iterator. The example of counting the number of +expansions might previously have looked like:: + + // This no longer works! + Flow[Int].expand((_, 0)){ case (in, count) => (in, count) -> (in, count + 1) } + +In Akka 2.4.x this is formulated like so: + +.. includecode:: ../code/docs/stream/MigrationsScala.scala#expand-state + diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index ca1e61324b..c76740775e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1312,7 +1312,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = this.viaMat(Flow.fromGraph(GraphDSL.create(that, - new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] { + new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] { def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = { val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T]) b.from(s).toInlet(zip.in1)