add migration guides for Scala and Java

also fix missing includes and wrong file locations
This commit is contained in:
Roland Kuhn 2016-01-19 20:26:43 +01:00
parent 7463c50fc9
commit 063e289718
12 changed files with 179 additions and 2027 deletions

View file

@ -36,6 +36,7 @@ The Current List of Modules
peek-mailbox peek-mailbox
aggregator aggregator
receive-pipeline receive-pipeline
circuitbreaker
Suggested Way of Using these Contributions Suggested Way of Using these Contributions
------------------------------------------ ------------------------------------------

View file

@ -3,258 +3,22 @@
*/ */
package docs.stream; package docs.stream;
import akka.NotUsed; import java.util.stream.Stream;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.http.javadsl.model.Uri;
import akka.dispatch.Futures;
import akka.japi.function.Creator;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.function.Function;
import akka.stream.*;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.util.ByteString;
import scala.Option;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.Promise;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.nio.charset.Charset;
public class MigrationsJava { public class MigrationsJava {
// This is compile-only code, no need for actually running anything.
public static ActorMaterializer mat = null;
public static ActorSystem sys = null;
public static class SomeInputStream extends InputStream {
public SomeInputStream() {
}
@Override
public int read() throws IOException {
return 0;
}
}
public static class SomeOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
return;
}
}
public static void main(String[] args) { public static void main(String[] args) {
//#expand-continually
Outlet<Integer> outlet = null; Flow.of(Integer.class).expand(in -> Stream.iterate(in, i -> i).iterator());
//#expand-continually
Outlet<Integer> outlet1 = null; //#expand-state
Outlet<Integer> outlet2 = null; Flow.of(Integer.class).expand(in ->
Stream.iterate(new Pair<>(in, 0),
Inlet<Integer> inlet = null; p -> new Pair<>(in, p.second() + 1)).iterator());
//#expand-state
Inlet<Integer> inlet1 = null;
Inlet<Integer> inlet2 = null;
Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class);
Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class);
Flow<Integer, Integer, NotUsed> flow2 = Flow.of(Integer.class);
Promise<Option<Integer>> promise = null;
{
Graph<SourceShape<Integer>, NotUsed> graphSource = null;
Graph<SinkShape<Integer>, NotUsed> graphSink = null;
Graph<FlowShape<Integer, Integer>, NotUsed> graphFlow = null;
//#flow-wrap
Source<Integer, NotUsed> source = Source.fromGraph(graphSource);
Sink<Integer, NotUsed> sink = Sink.fromGraph(graphSink);
Flow<Integer, Integer, NotUsed> aflow = Flow.fromGraph(graphFlow);
Flow.fromSinkAndSource(Sink.<Integer>head(), Source.single(0));
Flow.fromSinkAndSourceMat(Sink.<Integer>head(), Source.single(0), Keep.left());
//#flow-wrap
Graph<BidiShape<Integer, Integer, Integer, Integer>, NotUsed> bidiGraph = null;
//#bidi-wrap
BidiFlow<Integer, Integer, Integer, Integer, NotUsed> bidiFlow =
BidiFlow.fromGraph(bidiGraph);
BidiFlow.fromFlows(flow1, flow2);
BidiFlow.fromFlowsMat(flow1, flow2, Keep.both());
//#bidi-wrap
}
{
//#graph-create
GraphDSL.create(builder -> {
//...
return ClosedShape.getInstance();
});
GraphDSL.create(builder -> {
//...
return new FlowShape<>(inlet, outlet);
});
//#graph-create
}
{
//#graph-create-2
GraphDSL.create(builder -> {
//...
return SourceShape.of(outlet);
});
GraphDSL.create(builder -> {
//...
return SinkShape.of(inlet);
});
GraphDSL.create(builder -> {
//...
return FlowShape.of(inlet, outlet);
});
GraphDSL.create(builder -> {
//...
return BidiShape.of(inlet1, outlet1, inlet2, outlet2);
});
//#graph-create-2
}
{
//#graph-builder
GraphDSL.create(builder -> {
builder.from(outlet).toInlet(inlet);
builder.from(outlet).via(builder.add(flow)).toInlet(inlet);
builder.from(builder.add(Source.single(0))).to(builder.add(Sink.head()));
//...
return ClosedShape.getInstance();
});
//#graph-builder
}
//#source-creators
Source<Integer, Promise<Optional<Integer>>> src = Source.<Integer>maybe();
// Complete the promise with an empty option to emulate the old lazyEmpty
promise.trySuccess(scala.Option.empty());
final Source<String, Cancellable> ticks = Source.tick(
FiniteDuration.create(0, TimeUnit.MILLISECONDS),
FiniteDuration.create(200, TimeUnit.MILLISECONDS),
"tick");
final Source<Integer, NotUsed> pubSource =
Source.fromPublisher(TestPublisher.<Integer>manualProbe(true, sys));
final Source<Integer, NotUsed> futSource =
Source.fromFuture(Futures.successful(42));
final Source<Integer, Subscriber<Integer>> subSource =
Source.<Integer>asSubscriber();
//#source-creators
//#sink-creators
final Sink<Integer, NotUsed> subSink =
Sink.fromSubscriber(TestSubscriber.<Integer>manualProbe(sys));
//#sink-creators
//#sink-as-publisher
final Sink<Integer, Publisher<Integer>> pubSink =
Sink.<Integer>asPublisher(false);
final Sink<Integer, Publisher<Integer>> pubSinkFanout =
Sink.<Integer>asPublisher(true);
//#sink-as-publisher
//#empty-flow
Flow<Integer, Integer, NotUsed> emptyFlow = Flow.<Integer>create();
// or
Flow<Integer, Integer, NotUsed> emptyFlow2 = Flow.of(Integer.class);
//#empty-flow
//#flatMapConcat
Flow.<Source<Integer, NotUsed>>create().
<Integer, NotUsed>flatMapConcat(i -> i);
//#flatMapConcat
//#group-flatten
Flow.of(Integer.class)
.groupBy(2, in -> in % 2) // the first parameter sets max number of substreams
.map(subIn -> + 3)
.concatSubstreams();
//#group-flatten
final int maxDistinctWords = 1000;
//#group-fold
Flow.of(String.class)
.groupBy(maxDistinctWords, i -> i)
.fold(Pair.create("", 0), (pair, word) -> Pair.create(word, pair.second() + 1))
.mergeSubstreams();
//#group-fold
Uri uri = null;
//#raw-query
final Optional<String> theRawQueryString = uri.rawQueryString();
//#raw-query
//#query-param
final Optional<String> aQueryParam = uri.query().get("a");
//#query-param
//#file-source-sink
final Source<ByteString, Future<Long>> fileSrc =
FileIO.fromFile(new File("."));
final Source<ByteString, Future<Long>> otherFileSrc =
FileIO.fromFile(new File("."), 1024);
final Sink<ByteString, Future<Long>> fileSink =
FileIO.toFile(new File("."));
//#file-source-sink
//#input-output-stream-source-sink
final Source<ByteString, Future<java.lang.Long>> inputStreamSrc =
StreamConverters.fromInputStream((Creator<InputStream>) () -> new SomeInputStream());
final Source<ByteString, Future<java.lang.Long>> otherInputStreamSrc =
StreamConverters.fromInputStream((Creator<InputStream>) () -> new SomeInputStream(), 1024);
final Sink<ByteString, Future<java.lang.Long>> outputStreamSink =
StreamConverters.fromOutputStream((Creator<OutputStream>) () -> new SomeOutputStream());
//#input-output-stream-source-sink
//#output-input-stream-source-sink
final FiniteDuration timeout = FiniteDuration.Zero();
final Source<ByteString, OutputStream> outputStreamSrc =
StreamConverters.asOutputStream();
final Source<ByteString, OutputStream> otherOutputStreamSrc =
StreamConverters.asOutputStream(timeout);
final Sink<ByteString, InputStream> someInputStreamSink =
StreamConverters.asInputStream();
final Sink<ByteString, InputStream> someOtherInputStreamSink =
StreamConverters.asInputStream(timeout);
//#output-input-stream-source-sink
} }
} }

View file

@ -23,3 +23,4 @@ Streams
stream-cookbook stream-cookbook
../../general/stream/stream-configuration ../../general/stream/stream-configuration
migration-guide-1.0-2.x-java migration-guide-1.0-2.x-java
migration-guide-2.0-2.4-java

View file

@ -4,722 +4,6 @@
Migration Guide 1.0 to 2.x Migration Guide 1.0 to 2.x
########################## ##########################
The 2.0 release contains some structural changes that require some For this migration guide see `the documentation for Akka Streams 2.0`_.
simple, mechanical source-level changes in client code. While these are detailed below,
there is another change that may have an impact on the runtime behavior of your streams
and which therefore is listed first.
Operator Fusion is on by default .. _`the documentation for Akka Streams 2.0`: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/java/migration-guide-1.0-2.x-java.html
================================
Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that
the processing steps of a flow or stream graph can be executed within the same Actor and has three
consequences:
* starting up a stream may take longer than before due to executing the fusion algorithm
* passing elements from one processing stage to the next is a lot faster between fused
stages due to avoiding the asynchronous messaging overhead
* fused stream processing stages do no longer run in parallel to each other, meaning that
only up to one CPU core is used for each fused part
The first point can be countered by pre-fusing and then reusing a stream blueprint, see ``akka.stream.Fusing``.
In order to balance the effects of the second and third bullet points you will have to insert asynchronous
boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that
shall communicate with the rest of the graph in an asynchronous fashion.
.. warning::
Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer
that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers
may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer
there, data elements are passed without buffering between fused stages. In those cases where buffering
is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the
``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function.
The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``.
In that case you can still manually fuse those graphs which shall run on less Actors. Fusable elements are
* all GraphStages (this includes all built-in junctions apart from ``groupBy``)
* all Stages (this includes all built-in linear operators)
* TCP connections
Introduced proper named constructor methods instead of ``wrap()``
=================================================================
There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of
the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing
the number of overloads) and finding relevant methods in the documentation.
Creating a Flow from other stages
---------------------------------
It was possible to create a ``Flow`` from a graph with the correct shape (``FlowShape``) using ``wrap()``. Now this
must be done with the more descriptive method ``Flow.fromGraph()``.
It was possible to create a ``Flow`` from a ``Source`` and a ``Sink`` using ``wrap()``. Now this functionality can
be accessed trough the more descriptive methods ``Flow.fromSinkAndSource`` and ``Flow.fromSinkAndSourceMat``.
Creating a BidiFlow from other stages
-------------------------------------
It was possible to create a ``BidiFlow`` from a graph with the correct shape (``BidiShape``) using ``wrap()``. Now this
must be done with the more descriptive method ``BidiFlow.fromGraph()``.
It was possible to create a ``BidiFlow`` from two ``Flow`` s using ``wrap()``. Now this functionality can
be accessed trough the more descriptive methods ``BidiFlow.fromFlows`` and ``BidiFlow.fromFlowsMat``.
Update procedure
----------------
1. Replace all uses of ``Flow.wrap`` when it converts a ``Graph`` to a ``Flow`` with ``Flow.fromGraph``
2. Replace all uses of ``Flow.wrap`` when it converts a ``Source`` and ``Sink`` to a ``Flow`` with
``Flow.fromSinkAndSource`` or ``Flow.fromSinkAndSourceMat``
3. Replace all uses of ``BidiFlow.wrap`` when it converts a ``Graph`` to a ``BidiFlow`` with ``BidiFlow.fromGraph``
4. Replace all uses of ``BidiFlow.wrap`` when it converts two ``Flow`` s to a ``BidiFlow`` with
``BidiFlow.fromFlows`` or ``BidiFlow.fromFlowsMat``
5. Replace all uses of ``BidiFlow.apply()`` (Scala DSL) or ``BidiFlow.create()`` (Java DSL) when it converts two
functions to a ``BidiFlow`` with ``BidiFlow.fromFunctions``
Example
^^^^^^^
::
Graph<SourceShape<Integer>, BoxedUnit> graphSource = ...;
// This no longer works!
Source<Integer, BoxedUnit> source = Source.wrap(graphSource);
Graph<SinkShape<Integer>, BoxedUnit> graphSink = ...;
// This no longer works!
Sink<Integer, BoxedUnit> sink = Sink.wrap(graphSink);
Graph<FlowShape<Integer, Integer>, BoxedUnit> graphFlow = ...;
// This no longer works!
Flow<Integer, Integer, BoxedUnit> flow = Flow.wrap(graphFlow);
// This no longer works!
Flow.wrap(Sink.<Integer>head(), Source.single(0), Keep.left());
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#flow-wrap
and
::
Graph<BidiShape<Integer, Integer, Integer, Integer>, BoxedUnit> bidiGraph = ...;
// This no longer works!
BidiFlow<Integer, Integer, Integer, Integer, BoxedUnit> bidiFlow = BidiFlow.wrap(bidiGraph);
// This no longer works!
BidiFlow.wrap(flow1, flow2, Keep.both());
Should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#bidi-wrap
Renamed ``inlet()`` and ``outlet()`` to ``in()`` and ``out()`` in ``SourceShape``, ``SinkShape`` and ``FlowShape``
==================================================================================================================
The input and output ports of these shapes where called ``inlet()`` and ``outlet()`` compared to other shapes that
consistently used ``in()`` and ``out()``. Now all :class:`Shape` s use ``in()`` and ``out()``.
Update procedure
----------------
Change all references to ``inlet()`` to ``in()`` and all references to ``outlet()`` to ``out()`` when referring to the ports
of :class:`FlowShape`, :class:`SourceShape` and :class:`SinkShape`.
FlowGraph class and builder methods have been renamed
=====================================================
Due to incorrect overlap with the :class:`Flow` concept we renamed the :class:`FlowGraph` class to :class:`GraphDSL`.
There is now only one graph creation method called ``create`` which is analogous to the old ``partial`` method. For
closed graphs now it is explicitly required to return ``ClosedShape`` at the end of the builder block.
Update procedure
----------------
1. Search and replace all occurrences of ``FlowGraph`` with ``GraphDSL``.
2. Replace all occurrences of ``GraphDSL.partial()`` or ``GraphDSL.closed()`` with ``GraphDSL.create()``.
3. Add ``ClosedShape`` as a return value of the builder block if it was ``FlowGraph.closed()`` before.
4. Wrap the closed graph with ``RunnableGraph.fromGraph`` if it was ``FlowGraph.closed()`` before.
Example
^^^^^^^
::
// This no longer works!
FlowGraph.factory().closed(builder -> {
//...
});
// This no longer works!
FlowGraph.factory().partial(builder -> {
//...
return new FlowShape<>(inlet, outlet);
});
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#graph-create
Methods that create Source, Sink, Flow from Graphs have been removed
====================================================================
Previously there were convenience methods available on ``Sink``, ``Source``, ``Flow`` an ``BidiFlow`` to create
these DSL elements from a graph builder directly. Now this requires two explicit steps to reduce the number of overloaded
methods (helps Java 8 type inference) and also reduces the ways how these elements can be created. There is only one
graph creation method to learn (``GraphDSL.create``) and then there is only one conversion method to use ``fromGraph()``.
This means that the following methods have been removed:
- ``adapt()`` method on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (both DSLs)
- ``apply()`` overloads providing a graph ``Builder`` on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (Scala DSL)
- ``create()`` overloads providing a graph ``Builder`` on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (Java DSL)
Update procedure
----------------
Everywhere where ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` is created from a graph using a builder have to
be replaced with two steps
1. Create a ``Graph`` with the correct ``Shape`` using ``GraphDSL.create`` (e.g.. for ``Source`` it means first
creating a ``Graph`` with ``SourceShape``)
2. Create the required DSL element by calling ``fromGraph()`` on the required DSL element (e.g. ``Source.fromGraph``)
passing the graph created in the previous step
Example
^^^^^^^
::
// This no longer works!
Source.factory().create(builder -> {
//...
return outlet;
});
// This no longer works!
Sink.factory().create(builder -> {
//...
return inlet;
});
// This no longer works!
Flow.factory().create(builder -> {
//...
return new Pair<>(inlet, outlet);
});
// This no longer works!
BidiFlow.factory().create(builder -> {
//...
return new BidiShape<>(inlet1, outlet1, inlet2, outlet2);
});
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#graph-create-2
Some graph Builder methods have been removed
============================================
Due to the high number of overloads Java 8 type inference suffered, and it was also hard to figure out which time
to use which method. Therefore various redundant methods have been removed. As a consequence, every ``Sink``, ``Source``
and ``Flow`` needs to be explicitly added via ``builder.add()``.
Update procedure
----------------
1. All uses of ``builder.edge(outlet,inlet)`` should be replaced by the alternative ``builder.from(outlet).toInlet(inlet)``
3. All uses of ``builder.source`` should be replaced by ``builder.from(builder.add(source))``
4. All uses of ``builder.flow`` should be replaced by ``builder.….via(builder.add(flow))``
5. All uses of ``builder.sink`` should be replaced by ``builder.….to(builder.add(sink)))``
::
FlowGraph.factory().closed(builder -> {
// These no longer work
builder.edge(outlet, inlet);
builder.flow(outlet, flow, inlet);
builder.source(Source.single(0));
builder.sink(Sink.<Integer>head());
//...
});
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#graph-builder
Source constructor name changes
===============================
``Source.lazyEmpty`` has been replaced by ``Source.maybe`` which returns a ``Promise`` that can be completed by one or
zero elements by providing an ``Option``. This is different from ``lazyEmpty`` which only allowed completion to be
sent, but no elements.
The ``from()`` overload on ``Source`` has been refactored to separate methods to reduce the number of overloads and
make source creation more discoverable.
``Source.subscriber`` has been renamed to ``Source.asSubscriber``.
Update procedure
----------------
1. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with
a ``None`` (an empty ``Option``)
2. Replace all uses of ``Source.from(delay,interval,tick)`` with the method ``Source.tick(delay,interval,tick)``
3. Replace all uses of ``Source.from(publisher)`` with the method ``Source.fromPublisher(publisher)``
4. Replace all uses of ``Source.from(future)`` with the method ``Source.fromFuture(future))``
5. Replace all uses of ``Source.subscriber`` with the method ``Source.asSubscriber``
Example
^^^^^^^
::
// This no longer works!
Source<Integer, Promise<BoxedUnit>> src = Source.lazyEmpty();
//...
promise.trySuccess(BoxedUnit.UNIT);
// This no longer works!
final Source<String, Cancellable> ticks = Source.from(
FiniteDuration.create(0, TimeUnit.MILLISECONDS),
FiniteDuration.create(200, TimeUnit.MILLISECONDS),
"tick");
// This no longer works!
final Source<Integer, BoxedUnit> pubSource =
Source.from(TestPublisher.<Integer>manualProbe(true, sys));
// This no longer works!
final Source<Integer, BoxedUnit> futSource =
Source.from(Futures.successful(42));
// This no longer works!
final Source<Integer, Subscriber<Integer>> subSource =
Source.<Integer>subscriber();
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#source-creators
Sink constructor name changes
=============================
``Sink.create(subscriber)`` has been renamed to ``Sink.fromSubscriber(subscriber)`` to reduce the number of overloads and
make sink creation more discoverable.
Update procedure
----------------
1. Replace all uses of ``Sink.create(subscriber)`` with the method ``Sink.fromSubscriber(subscriber)``
Example
^^^^^^^
::
// This no longer works!
final Sink<Integer, BoxedUnit> subSink =
Sink.create(TestSubscriber.<Integer>manualProbe(sys));
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#sink-creators
``Flow.empty()`` have been removed
==================================
The ``empty()`` method has been removed since it behaves exactly the same as ``create()``, creating a ``Flow`` with no
transformations added yet.
Update procedure
----------------
1. Replace all uses of ``Flow.empty()`` with ``Flow.create``.
::
// This no longer works!
Flow<Integer, Integer, BoxedUnit> emptyFlow = Flow.<Integer>empty();
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#empty-flow
``flatten(FlattenStrategy)`` has been replaced by named counterparts
====================================================================
To simplify type inference in Java 8 and to make the method more discoverable, ``flatten(FlattenStrategy.concat)``
has been removed and replaced with the alternative method ``flatMapConcat(f)``.
Update procedure
----------------
1. Replace all occurrences of ``flatten(FlattenStrategy.concat)`` with ``flatMapConcat(identity)``
2. Consider replacing ``map(f).flatMapConcat(identity)`` with ``flatMapConcat(f)``
Example
^^^^^^^
::
Flow.<Source<Integer, BoxedUnit>>create().flatten(FlattenStrategy.concat());
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#flatMapConcat
`Sink.fanoutPublisher() and Sink.publisher() is now a single method`
====================================================================
It was a common user mistake to use ``Sink.publisher`` and get into trouble since it would only support
a single ``Subscriber``, and the discoverability of the apprpriate fix was non-obvious (Sink.fanoutPublisher).
To make the decision whether to support fanout or not an active one, the aforementioned methods have been
replaced with a single method: ``Sink.asPublisher(fanout: Boolean)``.
Update procedure
----------------
1. Replace all occurrences of ``Sink.publisher`` with ``Sink.asPublisher(false)``
2. Replace all occurrences of ``Sink.fanoutPublisher`` with ``Sink.asPublisher(true)``
Example
^^^^^^^
::
// This no longer works!
final Sink<Integer, Publisher<Integer>> pubSink =
Sink.<Integer>publisher();
// This no longer works!
final Sink<Integer, Publisher<Integer>> pubSink =
Sink.<Integer>fanoutPublisher(2, 8);
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#sink-as-publisher
FlexiMerge an FlexiRoute has been replaced by GraphStage
========================================================
The ``FlexiMerge`` and ``FlexiRoute`` DSLs have been removed since they provided an abstraction that was too limiting
and a better abstraction have been created which is called ``GraphStage``. ``GraphStage`` can express fan-in and
fan-out stages, but many other constructs as well with possibly multiple input and output ports (e.g. a ``BidiStage``).
This new abstraction provides a more uniform way to crate custom stream processing stages of arbitrary ``Shape``. In
fact, all of the built-in fan-in and fan-out stages are now implemented in terms of ``GraphStage``.
Update procedure
----------------
*There is no simple update procedure. The affected stages must be ported to the new ``GraphStage`` DSL manually. Please
read the* ``GraphStage`` *documentation (TODO) for details.*
GroupBy, SplitWhen and SplitAfter now return SubFlow or SubSource
=================================================================
Previously the ``groupBy``, ``splitWhen``, and ``splitAfter``
combinators returned a type that included a :class:`Source` within its
elements. Transforming these substreams was only possible by nesting
the respective combinators inside a ``map`` of the outer stream.
While this design enabled maximum flexibility for handling substreams,
it ultimately made it too easy to create a (potentially suprising)
deadlock. You can read more in `SubFlow-Motivation-Thread`_.
These operations have been made more convenient and also safer by
dropping down into transforming the substreams instead: the return
type is now a :class:`SubFlow` that does not implement the
:class:`Graph` interface and therefore only represents an unfinished
intermediate builder step. The substream mode can be ended by closing
the substreams (i.e. attaching a :class:`Sink`) or merging them back
together.
.. _SubFlow-Motivation-Thread: https://groups.google.com/d/msg/akka-user/_blLOcIHxJ4/i1DOoylmEgAJ
Update Procedure
----------------
The transformations that were done on the substreams need to be lifted
up one level. This only works for cases where the processing topology
is homogenous for all substreams. If your substream processing
topology is heterogeneous, consider creating a graph (see
:ref:`stream-graph-java`).
Example
^^^^^^^
::
Flow.<Integer> create()
// This no longer works!
.groupBy(i -> i % 2)
// This no longer works!
.map(pair -> pair.second().map(i -> i + 3))
// This no longer works!
.flatten(FlattenStrategy.concat())
This is implemented now as
.. includecode:: ../code/docs/stream/MigrationsJava.java#group-flatten
Example 2
^^^^^^^^^
::
Flow.<String> create()
// This no longer works!
.groupBy(i -> i)
// This no longer works!
.map(pair ->
pair.second().runFold(new Pair<>(pair.first(), 0),
(pair, word) -> new Pair<>(word, pair.second() + 1)))
// This no longer works!
.mapAsyncUnordered(4, i -> i)
This is implemented now as
.. includecode:: ../code/docs/stream/MigrationsJava.java#group-fold
Semantic change in ``isHoldingUpstream`` in the DetachedStage DSL
=================================================================
The ``isHoldingUpstream`` method used to return true if the upstream port was in holding state and a completion arrived
(inside the ``onUpstreamFinished`` callback). Now it returns ``false`` when the upstream is completed.
Update procedure
----------------
1. Those stages that relied on the previous behavior need to introduce an extra ``Boolean`` field with initial value
``false``
2. This field must be set on every call to ``holdUpstream()`` (and variants).
3. In completion, instead of calling ``isHoldingUpstream`` read this variable instead.
See the example in the AsyncStage migration section for an example of this procedure.
StatefulStage has been replaced by GraphStage
=============================================
The :class:`StatefulStage` class had some flaws and limitations, most notably around completion handling which
caused subtle bugs. The new :class:`GraphStage` (:ref:`graphstage-java`) solves these issues and should be used
instead.
Update procedure
----------------
There is no mechanical update procedure available. Please consult the :class:`GraphStage` documentation
(:ref:`graphstage-java`).
AsyncStage has been replaced by GraphStage
==========================================
Due to its complexity and inflexibility ``AsyncStage`` have been removed in favor of ``GraphStage``. Existing
``AsyncStage`` implementations can be ported in a mostly mechanical way.
Update procedure
----------------
1. The subclass of ``AsyncStage`` should be replaced by ``GraphStage``
2. The new subclass must define an ``in`` and ``out`` port (``Inlet`` and ``Outlet`` instance) and override the ``shape``
method returning a ``FlowShape``
3. An instance of ``GraphStageLogic`` must be returned by overriding ``createLogic()``. The original processing logic and
state will be encapsulated in this ``GraphStageLogic``
4. Using ``setHandler(port, handler)`` and ``InHandler`` instance should be set on ``in`` and an ``OutHandler`` should
be set on ``out``
5. ``onPush``, ``onUpstreamFinished`` and ``onUpstreamFailed`` are now available in the ``InHandler`` subclass created
by the user
6. ``onPull`` and ``onDownstreamFinished`` are now available in the ``OutHandler`` subclass created by the user
7. the callbacks above no longer take an extra `ctxt` context parameter.
8. ``onPull`` only signals the stage, the actual element can be obtained by calling ``grab(in)``
9. ``ctx.push(elem)`` is now ``push(out, elem)``
10. ``ctx.pull()`` is now ``pull(in)``
11. ``ctx.finish()`` is now ``completeStage()``
12. ``ctx.pushAndFinish(elem)`` is now simply two calls: ``push(out, elem); completeStage()``
13. ``ctx.fail(cause)`` is now ``failStage(cause)``
14. ``ctx.isFinishing()`` is now ``isClosed(in)``
15. ``ctx.absorbTermination()`` can be replaced with ``if (isAvailable(shape.outlet)) <call the onPull() handler>``
16. ``ctx.pushAndPull(elem)`` can be replaced with ``push(out, elem); pull(in)``
17. ``ctx.holdUpstreamAndPush`` and ``context.holdDownstreamAndPull`` can be replaced by simply ``push(elem)`` and
``pull()`` respectively
18. The following calls should be removed: ``ctx.ignore()``, ``ctx.holdUpstream()`` and ``ctx.holdDownstream()``.
19. ``ctx.isHoldingUpstream()`` can be replaced with ``isAvailable(out)``
20. ``ctx.isHoldingDowntream()`` can be replaced with ``!(isClosed(in) || hasBeenPulled(in))``
21. ``ctx.getAsyncCallback()`` is now ``getAsyncCallback(callback)`` which now takes a callback as a parameter. This
would correspond to the ``onAsyncInput()`` callback in the original ``AsyncStage``
We show the necessary steps in terms of an example ``AsyncStage``
Example
^^^^^^^
TODO
Akka HTTP: Uri parsing mode relaxed-with-raw-query replaced with rawQueryString
===============================================================================
Previously Akka HTTP allowed to configure the parsing mode of an Uri's Query part (``?a=b&c=d``) to ``relaxed-with-raw-query``
which is useful when Uris are not formatted using the usual "key/value pairs" syntax.
Instead of exposing it as an option for the parser, this is now available as the ``Option<String> rawQueryString()``
/ ``Option<String> queryString()`` methods on on ``model.Uri``.
For parsing the Query part use ``Query query(Charset charset, Uri.ParsingMode mode)``.
Update procedure
----------------
1. If the ``uri-parsing-mode`` was set to ``relaxed-with-raw-query``, remove it
2. In places where the query string was accessed in ``relaxed-with-raw-query`` mode, use the ``rawQueryString``/``queryString`` methods instead
3. In places where the parsed query parts (such as ``parameter``) were used, invoke parsing directly using ``uri.query().get("a")``
Example
^^^^^^^
::
// config, no longer works
akka.http.parsing.uri-parsing-mode = relaxed-with-raw-query
should be replaced by:
.. includecode:: ../code/docs/stream/MigrationsJava.java#raw-query
And use of query parameters from ``Uri`` that looked like this:
::
// This no longer works!
uri.parameter("name");
should be replaced by:
.. includecode:: ../code/docs/stream/MigrationsJava.java#query-param
SynchronousFileSource and SynchronousFileSink
=============================================
Both have been replaced by ``FileIO.toFile(…)`` and ``FileIO.fromFile(…)`` due to discoverability issues
paired with names which leaked internal implementation details.
Update procedure
----------------
Replace ``SynchronousFileSource.create(`` with ``FileIO.fromFile(``
Replace ``SynchronousFileSink.create(`` with ``FileIO.toFile(``
Replace ``SynchronousFileSink.appendTo(f)`` with ``FileIO.toFile(f, true)``
Example
^^^^^^^
::
// This no longer works!
final Source<ByteString, Future<java.lang.Long>> src =
SynchronousFileSource.create(new File("."));
// This no longer works!
final Source<ByteString, Future<java.lang.Long>> src =
SynchronousFileSource.create(new File("."), 1024);
// This no longer works!
final Sink<ByteString, Future<java.lang.Long>> sink =
`SynchronousFileSink.appendTo(new File("."));
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#file-source-sink
InputStreamSource and OutputStreamSink
======================================
Both have been replaced by ``StreamConverters.fromInputStream(…)`` and ``StreamConverters.fromOutputStream(…)`` due to discoverability issues.
Update procedure
----------------
Replace ``InputStreamSource.create(`` with ``StreamConverters.fromInputStream(``
Replace ``OutputStreamSink.create(`` with ``StreamConverters.fromOutputStream(``
Example
^^^^^^^
::
// This no longer works!
final Source<ByteString, Future<java.lang.Long>> inputStreamSrc =
InputStreamSource.create(new Creator<InputStream>(){
public InputStream create() {
return new SomeInputStream();
}
});
// This no longer works!
final Source<ByteString, Future<java.lang.Long>> otherInputStreamSrc =
InputStreamSource.create(new Creator<InputStream>(){
public InputStream create() {
return new SomeInputStream();
}
}, 1024);
// This no longer works!
final Sink<ByteString, Future<java.lang.Long>> outputStreamSink =
OutputStreamSink.create(new Creator<OutputStream>(){
public OutputStream create() {
return new SomeOutputStream();
}
})
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#input-output-stream-source-sink
OutputStreamSource and InputStreamSink
======================================
Both have been replaced by ``StreamConverters.asOutputStream(…)`` and ``StreamConverters.asInputStream(…)`` due to discoverability issues.
Update procedure
----------------
Replace ``OutputStreamSource.create(`` with ``StreamConverters.asOutputStream(``
Replace ``InputStreamSink.create(`` with ``StreamConverters.asInputStream(``
Example
^^^^^^^
::
// This no longer works!
final Source<ByteString, OutputStream> outputStreamSrc =
OutputStreamSource.create();
// This no longer works!
final Source<ByteString, OutputStream> otherOutputStreamSrc =
OutputStreamSource.create(timeout);
// This no longer works!
final Sink<ByteString, InputStream> someInputStreamSink =
InputStreamSink.create();
// This no longer works!
final Sink<ByteString, InputStream> someOtherInputStreamSink =
InputStreamSink.create(timeout);
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsJava.java#output-input-stream-source-sink

View file

@ -0,0 +1,75 @@
.. _migration-streams-2.0-2.4-java:
##############################
Migration Guide 2.0.x to 2.4.x
##############################
General notes
=============
akka.Done and akka.NotUsed replacing Unit and BoxedUnit
-------------------------------------------------------
To provide more clear signatures and have a unified API for both
Java and Scala two new types have been introduced:
``akka.NotUsed`` is meant to be used instead of ``Unit`` in Scala
and ``BoxedUnit`` in Java to signify that the type parameter is required
but not actually used. This is commonly the case with ``Source``, ``Flow`` and ``Sink``
that do not materialize into any value.
``akka.Done`` is added for the use case where it is boxed inside another object to signify
completion but there is no actual value attached to the completion. It is used to replace
occurrences of ``Future<BoxedUnit>`` with ``Future<Done>`` in Java and ``Future[Unit]`` with
``Future[Done]`` in Scala.
All previous usage of ``Unit`` and ``BoxedUnit`` for these two cases in the akka streams APIs
has been updated.
This means that Java code like this::
Source<String, BoxedUnit> source = Source.from(Arrays.asList("1", "2", "3"));
Sink<String, Future<BoxedUnit>> sink = Sink.ignore();
needs to be changed into::
Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2", "3"));
Sink<String, Future<Done>> sink = Sink.ignore();
These changes apply to all the places where streams are used, which means that signatures
in the persistent query APIs also are affected.
Changed Operators
=================
``expand()`` is now based on an Iterator
----------------------------------------
Previously the ``expand`` combinator required two functions as input: the first
one lifted incoming values into an extrapolation state and the second one
extracted values from that, possibly evolving that state. This has been
simplified into a single function that turns the incoming element into an
Iterator.
The most prominent use-case previously was to just repeat the previously received value::
// This no longer works!
Flow.of(Integer.class).expand(i -> i)(i -> new Pair<>(i, i));
In Akka 2.4.x this is simplified to:
.. includecode:: ../code/docs/stream/MigrationsJava.java#expand-continually
If state needs to be be kept during the expansion process then this state will
need to be managed by the Iterator. The example of counting the number of
expansions might previously have looked like::
// This no longer works!
Flow.of(Integer.class).expand(i -> new Pair<>(i, 0))(
pair -> new Pair<>(new Pair<>(pair.first(), pair.second()),
new Pair<>(pair.first(), pair.second() + 1)));
In Akka 2.4.x this is formulated like so:
.. includecode:: ../code/docs/stream/MigrationsJava.java#expand-state

View file

@ -1,52 +0,0 @@
.. _migration-streams-2.0.x-2.4.x:
###########################################
Migration Guide Akka Streams 2.0.x to 2.4.x
###########################################
General notes
=============
akka.Done and akka.NotUsed replacing Unit and BoxedUnit
-------------------------------------------------------
To provide more clear signatures and have a unified API for both
Java and Scala two new types have been introduced:
``akka.NotUsed`` is meant to be used instead of ``Unit`` in Scala
and ``BoxedUnit`` in Java to signify that the type parameter is required
but not actually used. This is commonly the case with ``Source``s, ``Flow``s and ``Sink``s
that do not materialize into any value.
``akka.Done`` is added for the use case where it is boxed inside another object to signify
completion but there is no actual value attached to the completion. It is used to replace
occurrences of ``Future<BoxedUnit>`` with ``Future<Done>`` in Java and ``Future[Unit]`` with
``Future[Done]`` in Scala.
All previous usage of ``Unit`` and ``BoxedUnit`` for these two cases in the akka streams APIs
has been updated.
This means that Java code like this::
Source<String, BoxedUnit> source = Source.from(Arrays.asList("1", "2", "3"));
Sink<String, Future<BoxedUnit>> sink = Sink.ignore()
needs to be changed into::
Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2", "3"));
Sink<String, Future<Done>> sink = Sink.ignore()
And Scala code like this::
Source[Int, Unit] source = Source.from(1 to 5)
Sink[Int, Future[Unit]] sink = Sink.ignore()
needs to be changed into::
Source[Int, NotUsed] source = Source.from(1 to 5)
Sink[Int, Future[Done]] sink = Sink.ignore()
These changes apply to all the places where streams are used, which means that signatures
in the persistent query APIs also are affected.

View file

@ -14,4 +14,3 @@ Migration Guides
migration-guide-eventsourced-2.3.x migration-guide-eventsourced-2.3.x
migration-guide-2.3.x-2.4.x migration-guide-2.3.x-2.4.x
migration-guide-2.4.x-2.5.x migration-guide-2.4.x-2.5.x
migration-streams-2.0.x-2.4.x

View file

@ -3,280 +3,26 @@
*/ */
package docs.stream package docs.stream
import java.io.File import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import _root_.akka.http.scaladsl.model.Uri
import _root_.akka.stream._
import _root_.akka.stream.scaladsl._
import _root_.akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import _root_.akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber }
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.{ Failure, Random, Success, Try }
class MigrationsScala extends AkkaSpec { class MigrationsScala extends AkkaSpec {
"Examples in migration guide" must { "Examples in migration guide" must {
"compile" in { "compile" in {
val flow1 = Flow[Int]
val flow2 = Flow[Int]
def inlet: Inlet[Int] = ???
def outlet: Outlet[Int] = ???
def inlet1: Inlet[Int] = ???
def outlet1: Outlet[Int] = ???
def inlet2: Inlet[Int] = ???
def outlet2: Outlet[Int] = ???
lazy val dontExecuteMe = { lazy val dontExecuteMe = {
//#flow-wrap //#expand-continually
val graphSource: Graph[SourceShape[Int], Unit] = ??? Flow[Int].expand(Iterator.continually(_))
val source: Source[Int, Unit] = Source.fromGraph(graphSource) //#expand-continually
//#expand-state
val graphSink: Graph[SinkShape[Int], Unit] = ??? Flow[Int].expand(i => {
val sink: Sink[Int, Unit] = Sink.fromGraph(graphSink) var state = 0
Iterator.continually({
val graphFlow: Graph[FlowShape[Int, Int], Unit] = ??? state += 1
val flow: Flow[Int, Int, Unit] = Flow.fromGraph(graphFlow) (i, state)
Flow.fromSinkAndSource(Sink.head[Int], Source.single(0))
//#flow-wrap
//#bidiflow-wrap
val bidiGraph: Graph[BidiShape[Int, Int, Int, Int], Unit] = ???
val bidi: BidiFlow[Int, Int, Int, Int, Unit] = BidiFlow.fromGraph(bidiGraph)
BidiFlow.fromFlows(flow1, flow2)
BidiFlow.fromFunctions((x: Int) => x + 1, (y: Int) => y * 3)
//#bidiflow-wrap
//#graph-create
// Replaces GraphDSL.closed()
GraphDSL.create() { builder =>
//...
ClosedShape
}
// Replaces GraphDSL.partial()
GraphDSL.create() { builder =>
//...
FlowShape(inlet, outlet)
}
//#graph-create
//#graph-create-2
Source.fromGraph(
GraphDSL.create() { builder =>
//...
SourceShape(outlet)
}) })
Sink.fromGraph(
GraphDSL.create() { builder =>
//...
SinkShape(inlet)
}) })
//#expand-state
Flow.fromGraph(
GraphDSL.create() { builder =>
//...
FlowShape(inlet, outlet)
})
BidiFlow.fromGraph(
GraphDSL.create() { builder =>
//...
BidiShape(inlet1, outlet1, inlet2, outlet2)
})
//#graph-create-2
//#graph-edges
RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
outlet ~> inlet
outlet ~> flow ~> inlet
//...
ClosedShape
})
//#graph-edges
val promise = Promise[Unit]()
//#source-creators
val src: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
//...
// This finishes the stream without emitting anything, just like Source.lazyEmpty did
promise.trySuccess(Some(()))
val ticks = Source.tick(1.second, 3.seconds, "tick")
val pubSource = Source.fromPublisher(TestPublisher.manualProbe[Int]())
val itSource = Source.fromIterator(() => Iterator.continually(Random.nextGaussian))
val futSource = Source.fromFuture(Future.successful(42))
val subSource = Source.asSubscriber
//#source-creators
//#sink-creators
val subSink = Sink.fromSubscriber(TestSubscriber.manualProbe[Int]())
//#sink-creators
//#sink-as-publisher
val pubSink = Sink.asPublisher(fanout = false)
val pubSinkFanout = Sink.asPublisher(fanout = true)
//#sink-as-publisher
//#flatMapConcat
Flow[Source[Int, Any]].flatMapConcat(identity)
//#flatMapConcat
//#group-flatten
Flow[Int]
.groupBy(2, _ % 2) // the first parameter sets max number of substreams
.map(_ + 3)
.concatSubstreams
//#group-flatten
val MaxDistinctWords = 1000
//#group-fold
Flow[String]
.groupBy(MaxDistinctWords, identity)
.fold(("", 0))((pair, word) => (word, pair._2 + 1))
.mergeSubstreams
//#group-fold
//#port-async
class MapAsyncOne[In, Out](f: In Future[Out])(implicit ec: ExecutionContext)
extends GraphStage[FlowShape[In, Out]] {
val in: Inlet[In] = Inlet("MapAsyncOne.in")
val out: Outlet[Out] = Outlet("MapAsyncOne.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
// The actual logic is encapsulated in a GraphStageLogic now
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
// All of the state *must* be encapsulated in the GraphStageLogic,
// not in the GraphStage
private var elemInFlight: Out = _
val callback = getAsyncCallback(onAsyncInput)
var holdingUpstream = false
// All upstream related events now are handled in an InHandler instance
setHandler(in, new InHandler {
// No context or element parameter for onPush
override def onPush(): Unit = {
// The element is not passed as an argument but needs to be dequeued explicitly
val elem = grab(in)
val future = f(elem)
future.onComplete(callback.invoke)
// ctx.holdUpstream is no longer needed, but we need to track the state
holdingUpstream = true
}
// No context parameter
override def onUpstreamFinish(): Unit = {
if (holdingUpstream) absorbTermination()
else completeStage() // ctx.finish turns into completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (elemInFlight != null) {
val e = elemInFlight
elemInFlight = null.asInstanceOf[Out]
pushIt(e)
} // holdDownstream is no longer needed
}
})
// absorbTermination turns into the code below.
// This emulates the behavior of the AsyncStage stage.
private def absorbTermination(): Unit =
if (isAvailable(shape.out)) getHandler(out).onPull()
// The line below emulates the behavior of the AsyncStage holdingDownstream
private def holdingDownstream(): Boolean =
!(isClosed(in) || hasBeenPulled(in))
// Any method can be used as a callback, we chose the previous name for
// easier comparison with the original code
private def onAsyncInput(input: Try[Out]) =
input match {
case Failure(ex) failStage(ex)
case Success(e) if holdingDownstream() pushIt(e)
case Success(e)
elemInFlight = e
// ctx.ignore is no longer needed
}
private def pushIt(elem: Out): Unit = {
// ctx.isFinishing turns into isClosed(in)
if (isClosed(in)) {
// pushAndFinish is now two actions
push(out, elem)
completeStage()
} else {
// pushAndPull is now two actions
push(out, elem)
pull(in)
holdingUpstream = false
}
}
}
}
//#port-async
val uri: Uri = ???
//#raw-query
val queryPart: Option[String] = uri.rawQueryString
//#raw-query
//#query-param
val param: Option[String] = uri.query().get("a")
//#query-param
//#file-source-sink
val fileSrc = FileIO.fromFile(new File("."))
val otherFileSrc = FileIO.fromFile(new File("."), 1024)
val someFileSink = FileIO.toFile(new File("."))
//#file-source-sink
class SomeInputStream extends java.io.InputStream { override def read(): Int = 0 }
class SomeOutputStream extends java.io.OutputStream { override def write(b: Int): Unit = () }
//#input-output-stream-source-sink
val inputStreamSrc = StreamConverters.fromInputStream(() => new SomeInputStream())
val otherInputStreamSrc = StreamConverters.fromInputStream(() => new SomeInputStream())
val someOutputStreamSink = StreamConverters.fromOutputStream(() => new SomeOutputStream())
//#input-output-stream-source-sink
//#output-input-stream-source-sink
val timeout: FiniteDuration = 0.seconds
val outputStreamSrc = StreamConverters.asOutputStream()
val otherOutputStreamSrc = StreamConverters.asOutputStream(timeout)
val someInputStreamSink = StreamConverters.asInputStream()
val someOtherInputStreamSink = StreamConverters.asInputStream(timeout)
//#output-input-stream-source-sink
} }
} }
} }

View file

@ -23,3 +23,4 @@ Streams
stream-cookbook stream-cookbook
../../general/stream/stream-configuration ../../general/stream/stream-configuration
migration-guide-1.0-2.x-scala migration-guide-1.0-2.x-scala
migration-guide-2.0-2.4-scala

View file

@ -4,746 +4,6 @@
Migration Guide 1.0 to 2.x Migration Guide 1.0 to 2.x
########################## ##########################
The 2.0 release contains some structural changes that require some For this migration guide see `the documentation for Akka Streams 2.0`_.
simple, mechanical source-level changes in client code. While these are detailed below,
there is another change that may have an impact on the runtime behavior of your streams
and which therefore is listed first.
Operator Fusion is on by default .. _`the documentation for Akka Streams 2.0`: http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/migration-guide-1.0-2.x-scala.html
================================
Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that
the processing steps of a flow or stream graph can be executed within the same Actor and has three
consequences:
* starting up a stream may take longer than before due to executing the fusion algorithm
* passing elements from one processing stage to the next is a lot faster between fused
stages due to avoiding the asynchronous messaging overhead
* fused stream processing stages do no longer run in parallel to each other, meaning that
only up to one CPU core is used for each fused part
The first point can be countered by pre-fusing and then reusing a stream blueprint, see ``akka.stream.Fusing``.
In order to balance the effects of the second and third bullet points you will have to insert asynchronous
boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that
shall communicate with the rest of the graph in an asynchronous fashion.
.. warning::
Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer
that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers
may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer
there, data elements are passed without buffering between fused stages. In those cases where buffering
is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the
``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function.
The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``.
In that case you can still manually fuse those graphs which shall run on less Actors. Fusable elements are
* all GraphStages (this includes all built-in junctions apart from ``groupBy``)
* all Stages (this includes all built-in linear operators)
* TCP connections
Introduced proper named constructor methods instead of ``wrap()``
=================================================================
There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of
the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing
the number of overloads) and finding relevant methods in the documentation.
Creating a Flow from other stages
---------------------------------
It was possible to create a ``Flow`` from a graph with the correct shape (``FlowShape``) using ``wrap()``. Now this
must be done with the more descriptive method ``Flow.fromGraph()``.
It was possible to create a ``Flow`` from a ``Source`` and a ``Sink`` using ``wrap()``. Now this functionality can
be accessed trough the more descriptive methods ``Flow.fromSinkAndSource`` and ``Flow.fromSinkAndSourceMat``.
Creating a BidiFlow from other stages
-------------------------------------
It was possible to create a ``BidiFlow`` from a graph with the correct shape (``BidiShape``) using ``wrap()``. Now this
must be done with the more descriptive method ``BidiFlow.fromGraph()``.
It was possible to create a ``BidiFlow`` from two ``Flow`` s using ``wrap()``. Now this functionality can
be accessed trough the more descriptive methods ``BidiFlow.fromFlows`` and ``BidiFlow.fromFlowsMat``.
It was possible to create a ``BidiFlow`` from two functions using ``apply()`` (Scala DSL) or ``create()`` (Java DSL).
Now this functionality can be accessed trough the more descriptive method ``BidiFlow.fromFunctions``.
Update procedure
----------------
1. Replace all uses of ``Flow.wrap`` when it converts a ``Graph`` to a ``Flow`` with ``Flow.fromGraph``
2. Replace all uses of ``Flow.wrap`` when it converts a ``Source`` and ``Sink`` to a ``Flow`` with
``Flow.fromSinkAndSource`` or ``Flow.fromSinkAndSourceMat``
3. Replace all uses of ``BidiFlow.wrap`` when it converts a ``Graph`` to a ``BidiFlow`` with ``BidiFlow.fromGraph``
4. Replace all uses of ``BidiFlow.wrap`` when it converts two ``Flow`` s to a ``BidiFlow`` with
``BidiFlow.fromFlows`` or ``BidiFlow.fromFlowsMat``
5. Replace all uses of ``BidiFlow.apply()`` when it converts two
functions to a ``BidiFlow`` with ``BidiFlow.fromFunctions``
Example
^^^^^^^
::
val graphSource: Graph[SourceShape[Int], Unit] = ???
// This no longer works!
val source: Source[Int, Unit] = Source.wrap(graphSource)
val graphSink: Graph[SinkShape[Int], Unit] = ???
// This no longer works!
val sink: Sink[Int, Unit] = Sink.wrap(graphSink)
val graphFlow: Graph[FlowShape[Int, Int], Unit] = ???
// This no longer works!
val flow: Flow[Int, Int, Unit] = Flow.wrap(graphFlow)
// This no longer works
Flow.wrap(Sink.head[Int], Source.single(0))(Keep.left)
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#flow-wrap
and
::
val bidiGraph: Graph[BidiShape[Int, Int, Int, Int], Unit = ???
// This no longer works!
val bidi: BidiFlow[Int, Int, Int, Int, Unit] = BidiFlow.wrap(bidiGraph)
// This no longer works!
BidiFlow.wrap(flow1, flow2)(Keep.both)
// This no longer works!
BidiFlow((x: Int) => x + 1, (y: Int) => y * 3)
Should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#bidiflow-wrap
FlowGraph class and builder methods have been renamed
=====================================================
Due to incorrect overlap with the :class:`Flow` concept we renamed the :class:`FlowGraph` class to :class:`GraphDSL`.
There is now only one graph creation method called ``create`` which is analogous to the old ``partial`` method. For
closed graphs now it is explicitly required to return ``ClosedShape`` at the end of the builder block.
Update procedure
----------------
1. Search and replace all occurrences of ``FlowGraph`` with ``GraphDSL``.
2. Replace all occurrences of ``GraphDSL.partial()`` or ``GraphDSL.closed()`` with ``GraphDSL.create()``.
3. Add ``ClosedShape`` as a return value of the builder block if it was ``FlowGraph.closed()`` before.
4. Wrap the closed graph with ``RunnableGraph.fromGraph`` if it was ``FlowGraph.closed()`` before.
Example
^^^^^^^
::
// This no longer works!
FlowGraph.closed() { builder =>
//...
}
// This no longer works!
FlowGraph.partial() { builder =>
//...
FlowShape(inlet, outlet)
}
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#graph-create
Methods that create Source, Sink, Flow from Graphs have been removed
====================================================================
Previously there were convenience methods available on ``Sink``, ``Source``, ``Flow`` an ``BidiFlow`` to create
these DSL elements from a graph builder directly. Now this requires two explicit steps to reduce the number of overloaded
methods (helps Java 8 type inference) and also reduces the ways how these elements can be created. There is only one
graph creation method to learn (``GraphDSL.create``) and then there is only one conversion method to use ``fromGraph()``.
This means that the following methods have been removed:
- ``adapt()`` method on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (both DSLs)
- ``apply()`` overloads providing a graph ``Builder`` on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (Scala DSL)
- ``create()`` overloads providing a graph ``Builder`` on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (Java DSL)
Update procedure
----------------
Everywhere where ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` is created from a graph using a builder have to
be replaced with two steps
1. Create a ``Graph`` with the correct ``Shape`` using ``GraphDSL.create`` (e.g.. for ``Source`` it means first
creating a ``Graph`` with ``SourceShape``)
2. Create the required DSL element by calling ``fromGraph()`` on the required DSL element (e.g. ``Source.fromGraph``)
passing the graph created in the previous step
Example
^^^^^^^
::
// This no longer works!
Source() { builder =>
//...
outlet
}
// This no longer works!
Sink() { builder =>
//...
inlet
}
// This no longer works!
Flow() { builder =>
//...
(inlet, outlet)
}
// This no longer works!
BidiFlow() { builder =>
//...
BidiShape(inlet1, outlet1, inlet2, outlet2)
}
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#graph-create-2
Several Graph builder methods have been removed
===============================================
The ``addEdge`` methods have been removed from the DSL to reduce the ways connections can be made and to reduce the
number of overloads. Now only the ``~>`` notation is available which requires the import of the implicits
``GraphDSL.Implicits._``.
Update procedure
----------------
1. Replace all uses of ``scaladsl.Builder.addEdge(Outlet, Inlet)`` by the graphical DSL ``~>``.
2. Replace all uses of ``scaladsl.Builder.addEdge(Outlet, FlowShape, Inlet)`` by the graphical DSL ``~>``.
methods, or the graphical DSL ``~>``.
3. Import ``FlowGraph.Implicits._`` in the builder block or an enclosing scope.
Example
^^^^^^^
::
FlowGraph.closed() { builder =>
//...
// This no longer works!
builder.addEdge(outlet, inlet)
// This no longer works!
builder.addEdge(outlet, flow1, inlet)
//...
}
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#graph-edges
Source constructor name changes
===============================
``Source.lazyEmpty`` has been replaced by ``Source.maybe`` which returns a ``Promise`` that can be completed by one or
zero elements by providing an ``Option``. This is different from ``lazyEmpty`` which only allowed completion to be
sent, but no elements.
The ``apply()`` overload on ``Source`` has been refactored to separate methods to reduce the number of overloads and
make source creation more discoverable.
``Source.subscriber`` has been renamed to ``Source.asSubscriber``.
Update procedure
----------------
1. All uses of ``Source.lazyEmpty`` should be replaced by ``Source.maybe`` and the returned ``Promise`` completed with
a ``None`` (an empty ``Option``)
2. Replace all uses of ``Source(delay,interval,tick)`` with the method ``Source.tick(delay,interval,tick)``
3. Replace all uses of ``Source(publisher)`` with the method ``Source.fromPublisher(publisher)``
4. Replace all uses of ``Source(() => iterator)`` with the method ``Source.fromIterator(() => iterator))``
5. Replace all uses of ``Source(future)`` with the method ``Source.fromFuture(future))``
6. Replace all uses of ``Source.subscriber`` with the method ``Source.asSubscriber``
Example
^^^^^^^
::
// This no longer works!
val src: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int]
//...
promise.trySuccess(())
// This no longer works!
val ticks = Source(1.second, 3.seconds, "tick")
// This no longer works!
val pubSource = Source(TestPublisher.manualProbe[Int]())
// This no longer works!
val itSource = Source(() => Iterator.continually(Random.nextGaussian))
// This no longer works!
val futSource = Source(Future.successful(42))
// This no longer works!
val subSource = Source.subscriber
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#source-creators
Sink constructor name changes
=============================
``Sink.apply(subscriber)`` has been renamed to ``Sink.fromSubscriber(subscriber)`` to reduce the number of overloads and
make sink creation more discoverable.
Update procedure
----------------
1. Replace all uses of ``Sink(subscriber)`` with the method ``Sink.fromSubscriber(subscriber)``
Example
^^^^^^^
::
// This no longer works!
val subSink = Sink(TestSubscriber.manualProbe[Int]())
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#sink-creators
``flatten(FlattenStrategy)`` has been replaced by named counterparts
====================================================================
To simplify type inference in Java 8 and to make the method more discoverable, ``flatten(FlattenStrategy.concat)``
has been removed and replaced with the alternative method ``flatten(FlattenStrategy.concat)``.
Update procedure
----------------
1. Replace all occurrences of ``flatten(FlattenStrategy.concat)`` with ``flatMapConcat(identity)``
2. Consider replacing all occurrences of ``map(f).flatMapConcat(identity)`` with ``flatMapConcat(f)``
Example
^^^^^^^
::
// This no longer works!
Flow[Source[Int, Any]].flatten(FlattenStrategy.concat)
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#flatMapConcat
`Sink.fanoutPublisher()` and `Sink.publisher()` is now a single method
======================================================================
It was a common user mistake to use ``Sink.publisher`` and get into trouble since it would only support
a single ``Subscriber``, and the discoverability of the apprpriate fix was non-obvious (Sink.fanoutPublisher).
To make the decision whether to support fanout or not an active one, the aforementioned methods have been
replaced with a single method: ``Sink.asPublisher(fanout: Boolean)``.
Update procedure
----------------
1. Replace all occurrences of ``Sink.publisher`` with ``Sink.asPublisher(false)``
2. Replace all occurrences of ``Sink.fanoutPublisher`` with ``Sink.asPublisher(true)``
Example
^^^^^^^
::
// This no longer works!
val subSink = Sink.publisher
// This no longer works!
val subSink = Sink.fanoutPublisher(2, 8)
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#sink-as-publisher
FlexiMerge an FlexiRoute has been replaced by GraphStage
========================================================
The ``FlexiMerge`` and ``FlexiRoute`` DSLs have been removed since they provided an abstraction that was too limiting
and a better abstraction have been created which is called ``GraphStage``. ``GraphStage`` can express fan-in and
fan-out stages, but many other constructs as well with possibly multiple input and output ports (e.g. a ``BidiStage``).
This new abstraction provides a more uniform way to crate custom stream processing stages of arbitrary ``Shape``. In
fact, all of the built-in fan-in and fan-out stages are now implemented in terms of ``GraphStage``.
Update procedure
----------------
*There is no simple update procedure. The affected stages must be ported to the new ``GraphStage`` DSL manually. Please
read the* ``GraphStage`` *documentation (TODO) for details.*
GroupBy, SplitWhen and SplitAfter now return SubFlow
====================================================
Previously the ``groupBy``, ``splitWhen``, and ``splitAfter``
combinators returned a type that included a :class:`Source` within its
elements. Transforming these substreams was only possible by nesting
the respective combinators inside a ``map`` of the outer stream.
While this design enabled maximum flexibility for handling substreams,
it ultimately made it too easy to create a (potentially suprising)
deadlock. You can read more in `SubFlow-Motivation-Thread`_.
These operations have been made more convenient and also safer by
dropping down into transforming the substreams instead: the return
type is now a :class:`SubFlow` that does not implement the
:class:`Graph` interface and therefore only represents an unfinished
intermediate builder step. The substream mode can be ended by closing
the substreams (i.e. attaching a :class:`Sink`) or merging them back
together.
.. _SubFlow-Motivation-Thread: https://groups.google.com/d/msg/akka-user/_blLOcIHxJ4/i1DOoylmEgAJ
Update Procedure
----------------
The transformations that were done on the substreams need to be lifted
up one level. This only works for cases where the processing topology
is homogenous for all substreams. If your substream processing
topology is heterogeneous, consider creating a graph (see
:ref:`stream-graph-scala`).
Example
^^^^^^^
::
Flow[Int]
// This no longer works!
.groupBy(_ % 2)
// This no longer works!
.map {
case (key, source) => source.map(_ + 3)
}
// This no longer works!
.flatten(FlattenStrategy.concat)
This is implemented now as
.. includecode:: ../code/docs/stream/MigrationsScala.scala#group-flatten
Example 2
^^^^^^^^^
::
Flow[String]
// This no longer works!
.groupBy(identity)
// This no longer works!
.map {
case (key, source) => source.runFold((key, 0))((pair, word) => (key, pair._2 + 1))
}
// This no longer works!
.mapAsyncUnordered(4, identity)
This is implemented now as
.. includecode:: ../code/docs/stream/MigrationsScala.scala#group-fold
Variance of Inlet and Outlet
============================
Scala uses *declaration site variance* which was cumbersome in the cases of ``Inlet`` and ``Outlet`` as they are
purely symbolic object containing no fields or methods and which are used both in input and output locations (wiring
an ``Outlet`` into an ``Inlet``; reading in a stage from an ``Inlet``). Because of this reasons all users of these
port abstractions now use *use-site variance* (just like Java variance works). This in general does not affect user
code expect the case of custom shapes, which now require ``@uncheckedVariance`` annotations on their ``Inlet`` and
``Outlet`` members (since these are now invariant, but the Scala compiler does not know that they have no fields or
methods that would violate variance constraints)
This change does not affect Java DSL users.
Update procedure
----------------
1. All custom shapes must use ``@uncheckedVariance`` on their ``Inlet`` and ``Outlet`` members.
Renamed ``inlet()`` and ``outlet()`` to ``in()`` and ``out()`` in ``SourceShape``, ``SinkShape`` and ``FlowShape``
==================================================================================================================
The input and output ports of these shapes where called ``inlet()`` and ``outlet()`` compared to other shapes that
consistently used ``in()`` and ``out()``. Now all :class:`Shape` s use ``in()`` and ``out()``.
Update procedure
----------------
Change all references to ``inlet()`` to ``in()`` and all references to ``outlet()`` to ``out()`` when referring to the ports
of :class:`FlowShape`, :class:`SourceShape` and :class:`SinkShape`.
Semantic change in ``isHoldingUpstream`` in the DetachedStage DSL
=================================================================
The ``isHoldingUpstream`` method used to return true if the upstream port was in holding state and a completion arrived
(inside the ``onUpstreamFinished`` callback). Now it returns ``false`` when the upstream is completed.
Update procedure
----------------
1. Those stages that relied on the previous behavior need to introduce an extra ``Boolean`` field with initial value
``false``
2. This field must be set on every call to ``holdUpstream()`` (and variants).
3. In completion, instead of calling ``isHoldingUpstream`` read this variable instead.
See the example in the AsyncStage migration section for an example of this procedure.
StatefulStage has been replaced by GraphStage
=============================================
The :class:`StatefulStage` class had some flaws and limitations, most notably around completion handling which
caused subtle bugs. The new :class:`GraphStage` (:ref:`graphstage-java`) solves these issues and should be used
instead.
Update procedure
----------------
There is no mechanical update procedure available. Please consult the :class:`GraphStage` documentation
(:ref:`graphstage-java`).
AsyncStage has been replaced by GraphStage
==========================================
Due to its complexity and inflexibility ``AsyncStage`` have been removed in favor of ``GraphStage``. Existing
``AsyncStage`` implementations can be ported in a mostly mechanical way.
Update procedure
----------------
1. The subclass of ``AsyncStage`` should be replaced by ``GraphStage``
2. The new subclass must define an ``in`` and ``out`` port (``Inlet`` and ``Outlet`` instance) and override the ``shape``
method returning a ``FlowShape``
3. An instance of ``GraphStageLogic`` must be returned by overriding ``createLogic()``. The original processing logic and
state will be encapsulated in this ``GraphStageLogic``
4. Using ``setHandler(port, handler)`` and ``InHandler`` instance should be set on ``in`` and an ``OutHandler`` should
be set on ``out``
5. ``onPush``, ``onUpstreamFinished`` and ``onUpstreamFailed`` are now available in the ``InHandler`` subclass created
by the user
6. ``onPull`` and ``onDownstreamFinished`` are now available in the ``OutHandler`` subclass created by the user
7. the callbacks above no longer take an extra `ctxt` context parameter.
8. ``onPull`` only signals the stage, the actual element can be obtained by calling ``grab(in)``
9. ``ctx.push(elem)`` is now ``push(out, elem)``
10. ``ctx.pull()`` is now ``pull(in)``
11. ``ctx.finish()`` is now ``completeStage()``
12. ``ctx.pushAndFinish(elem)`` is now simply two calls: ``push(out, elem); completeStage()``
13. ``ctx.fail(cause)`` is now ``failStage(cause)``
14. ``ctx.isFinishing()`` is now ``isClosed(in)``
15. ``ctx.absorbTermination()`` can be replaced with ``if (isAvailable(shape.outlet)) <call the onPull() handler>``
16. ``ctx.pushAndPull(elem)`` can be replaced with ``push(out, elem); pull(in)``
17. ``ctx.holdUpstreamAndPush`` and ``context.holdDownstreamAndPull`` can be replaced by simply ``push(elem)`` and
``pull()`` respectively
18. The following calls should be removed: ``ctx.ignore()``, ``ctx.holdUpstream()`` and ``ctx.holdDownstream()``.
19. ``ctx.isHoldingUpstream()`` can be replaced with ``isAvailable(out)``
20. ``ctx.isHoldingDowntream()`` can be replaced with ``!(isClosed(in) || hasBeenPulled(in))``
21. ``ctx.getAsyncCallback()`` is now ``getAsyncCallback(callback)`` which now takes a callback as a parameter. This
would correspond to the ``onAsyncInput()`` callback in the original ``AsyncStage``
We show the necessary steps in terms of an example ``AsyncStage``
Example
^^^^^^^
::
class MapAsyncOne[In, Out](f: In ⇒ Future[Out])(implicit ec: ExecutionContext)
extends AsyncStage[In, Out, Try[Out]] {
private var elemInFlight: Out = _
override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]) = {
val future = f(elem)
val cb = ctx.getAsyncCallback
future.onComplete(cb.invoke)
ctx.holdUpstream()
}
override def onPull(ctx: AsyncContext[Out, Try[Out]]) =
if (elemInFlight != null) {
val e = elemInFlight
elemInFlight = null.asInstanceOf[Out]
pushIt(e, ctx)
} else ctx.holdDownstream()
override def onAsyncInput(input: Try[Out], ctx: AsyncContext[Out, Try[Out]]) =
input match {
case Failure(ex) ⇒ ctx.fail(ex)
case Success(e) if ctx.isHoldingDownstream ⇒ pushIt(e, ctx)
case Success(e) ⇒
elemInFlight = e
ctx.ignore()
}
override def onUpstreamFinish(ctx: AsyncContext[Out, Try[Out]]) =
if (ctx.isHoldingUpstream) ctx.absorbTermination()
else ctx.finish()
private def pushIt(elem: Out, ctx: AsyncContext[Out, Try[Out]]) =
if (ctx.isFinishing) ctx.pushAndFinish(elem)
else ctx.pushAndPull(elem)
}
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#port-async
Akka HTTP: Uri parsing mode relaxed-with-raw-query replaced with rawQueryString
===============================================================================
Previously Akka HTTP allowed to configure the parsing mode of an Uri's Query part (``?a=b&c=d``) to ``relaxed-with-raw-query``
which is useful when Uris are not formatted using the usual "key/value pairs" syntax.
Instead of exposing it as an option for the parser, this is now available as the ``rawQueryString(): Option[String]``
/ ``queryString(): Option[String]`` methods on on ``model.Uri``.
For parsing the Query part use ``query(charset: Charset = UTF8, mode: Uri.ParsingMode = Uri.ParsingMode.Relaxed): Query``.
Update procedure
----------------
1. If the ``uri-parsing-mode`` was set to ``relaxed-with-raw-query``, remove it
2. In places where the query string was accessed in ``relaxed-with-raw-query`` mode, use the ``rawQueryString``/``queryString`` methods instead
3. In places where the parsed query parts (such as ``parameter``) were used, invoke parsing directly using ``uri.query().get("a")``
Example
^^^^^^^
::
// config, no longer works
akka.http.parsing.uri-parsing-mode = relaxed-with-raw-query
should be replaced by:
.. includecode:: ../code/docs/stream/MigrationsScala.scala#raw-query
And use of query parameters from ``Uri`` that looked like this:
::
// This no longer works!
uri.parameter("name")
should be replaced by:
.. includecode:: ../code/docs/stream/MigrationsScala.scala#query-param
SynchronousFileSource and SynchronousFileSink
=============================================
``SynchronousFileSource`` and ``SynchronousFileSink``
have been replaced by ``FileIO.read(…)`` and ``FileIO.write(…)`` due to discoverability issues
paired with names which leaked internal implementation details.
Update procedure
----------------
Replace ``SynchronousFileSource(`` and ``SynchronousFileSource.apply(`` with ``FileIO.fromFile(``
Replace ``SynchronousFileSink(`` and ``SynchronousFileSink.apply(`` with ``FileIO.toFile(``
Example
^^^^^^^
::
// This no longer works!
val fileSrc = SynchronousFileSource(new File("."))
// This no longer works!
val otherFileSrc = SynchronousFileSource(new File("."), 1024)
// This no longer works!
val someFileSink = SynchronousFileSink(new File("."))
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#file-source-sink
InputStreamSource and OutputStreamSink
======================================
Both have been replaced by ``StreamConverters.fromInputStream(…)`` and ``StreamConverters.fromOutputStream(…)`` due to discoverability issues.
Update procedure
----------------
Replace ``InputStreamSource(`` and ``InputStreamSource.apply(`` with ``StreamConverters.fromInputStream(``
i
Replace ``OutputStreamSink(`` and ``OutputStreamSink.apply(`` with ``StreamConverters.fromOutputStream(``
Example
^^^^^^^
::
// This no longer works!
val inputStreamSrc = InputStreamSource(() => new SomeInputStream())
// This no longer works!
val otherInputStreamSrc = InputStreamSource(() => new SomeInputStream(), 1024)
// This no longer works!
val someOutputStreamSink = OutputStreamSink(() => new SomeOutputStream())
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#input-output-stream-source-sink
OutputStreamSource and InputStreamSink
======================================
Both have been replaced by ``StreamConverters.asOutputStream(…)`` and ``StreamConverters.asInputStream(…)`` due to discoverability issues.
Update procedure
----------------
Replace ``OutputStreamSource(`` and ``OutputStreamSource.apply(`` with ``StreamConverters.asOutputStream(``
Replace ``InputStreamSink(`` and ``InputStreamSink.apply(`` with ``StreamConverters.asInputStream(``
Example
^^^^^^^
::
// This no longer works!
val outputStreamSrc = OutputStreamSource()
// This no longer works!
val otherOutputStreamSrc = OutputStreamSource(timeout)
// This no longer works!
val someInputStreamSink = InputStreamSink()
// This no longer works!
val someOtherInputStreamSink = InputStreamSink(timeout);
should be replaced by
.. includecode:: ../code/docs/stream/MigrationsScala.scala#output-input-stream-source-sink

View file

@ -0,0 +1,73 @@
.. _migration-streams-2.0-2.4-scala:
##############################
Migration Guide 2.0.x to 2.4.x
##############################
General notes
=============
akka.Done and akka.NotUsed replacing Unit and BoxedUnit
-------------------------------------------------------
To provide more clear signatures and have a unified API for both
Java and Scala two new types have been introduced:
``akka.NotUsed`` is meant to be used instead of ``Unit`` in Scala
and ``BoxedUnit`` in Java to signify that the type parameter is required
but not actually used. This is commonly the case with ``Source``, ``Flow`` and ``Sink``
that do not materialize into any value.
``akka.Done`` is added for the use case where it is boxed inside another object to signify
completion but there is no actual value attached to the completion. It is used to replace
occurrences of ``Future<BoxedUnit>`` with ``Future<Done>`` in Java and ``Future[Unit]`` with
``Future[Done]`` in Scala.
All previous usage of ``Unit`` and ``BoxedUnit`` for these two cases in the akka streams APIs
has been updated.
This means that Scala code like this::
Source[Int, Unit] source = Source.from(1 to 5)
Sink[Int, Future[Unit]] sink = Sink.ignore()
needs to be changed into::
Source[Int, NotUsed] source = Source.from(1 to 5)
Sink[Int, Future[Done]] sink = Sink.ignore()
These changes apply to all the places where streams are used, which means that signatures
in the persistent query APIs also are affected.
Changed Operators
=================
``expand()`` is now based on an Iterator
----------------------------------------
Previously the ``expand`` combinator required two functions as input: the first
one lifted incoming values into an extrapolation state and the second one
extracted values from that, possibly evolving that state. This has been
simplified into a single function that turns the incoming element into an
Iterator.
The most prominent use-case previously was to just repeat the previously received value::
Flow[Int].expand(identity)(s => (s, s)) // This no longer works!
In Akka 2.4.x this is simplified to:
.. includecode:: ../code/docs/stream/MigrationsScala.scala#expand-continually
If state needs to be be kept during the expansion process then this state will
need to be managed by the Iterator. The example of counting the number of
expansions might previously have looked like::
// This no longer works!
Flow[Int].expand((_, 0)){ case (in, count) => (in, count) -> (in, count + 1) }
In Akka 2.4.x this is formulated like so:
.. includecode:: ../code/docs/stream/MigrationsScala.scala#expand-state