Merge pull request #20143 from akka/wip-document-flowops-extension-RK

document FlowOps extension
This commit is contained in:
Konrad Malawski 2016-03-29 12:07:30 +02:00
commit 27e3c5d925
3 changed files with 64 additions and 1 deletions

View file

@ -519,4 +519,32 @@ class GraphStageDocSpec extends AkkaSpec {
sub.cancel()
}
"Demonstrate stream extension" when {
"targeting a Source" in {
//#extending-source
implicit class SourceDuplicator[Out, Mat](s: Source[Out, Mat]) {
def duplicateElements: Source[Out, Mat] = s.via(new Duplicator)
}
val s = Source(1 to 3).duplicateElements
s.runWith(Sink.seq).futureValue should ===(Seq(1, 1, 2, 2, 3, 3))
//#extending-source
}
"targeting a Flow" in {
//#extending-flow
implicit class FlowDuplicator[In, Out, Mat](s: Flow[In, Out, Mat]) {
def duplicateElements: Flow[In, Out, Mat] = s.via(new Duplicator)
}
val f = Flow[Int].duplicateElements
Source(1 to 3).via(f).runWith(Sink.seq).futureValue should ===(Seq(1, 1, 2, 2, 3, 3))
//#extending-flow
}
}
}

View file

@ -32,7 +32,8 @@ object TwitterStreamQuickstartDocSpec {
//#model
//#tweet-source
val tweets: Source[Tweet, NotUsed] //#tweet-source
val tweets: Source[Tweet, NotUsed]
//#tweet-source
= Source(
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::

View file

@ -448,3 +448,37 @@ stage as state of an actor, and the callbacks as the ``receive`` block of the ac
is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over**
internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined
behavior.
Extending Flow Combinators with Custom Operators
================================================
The most general way of extending any :class:`Source`, :class:`Flow` or :class:`SubFlow` (e.g. from ``groupBy``) is
demonstrated above: create a graph of flow-shape like the :class:`Duplicator` example given above and use the ``.via(...)``
combinator to integrate it into your stream topology. This works with all :class:`FlowOps` sub-types, including the
ports that you connect with the graph DSL.
Advanced Scala users may wonder whether it is possible to write extension methods that enrich :class:`FlowOps` to
allow nicer syntax. The short answer is that Scala 2 does not support this in a fully generic fashion, the problem is
that it is impossible to abstract over the kind of stream that is being extended because :class:`Source`, :class:`Flow`
and :class:`SubFlow` differ in the number and kind of their type parameters. While it would be possible to write
an implicit class that enriches them generically, this class would require explicit instantiation with all type
parameters due to `SI-2712 <https://issues.scala-lang.org/browse/SI-2712>`_. For a partial workaround that unifies
extensions to :class:`Source` and :class:`Flow` see `this sketch by R. Kuhn <https://gist.github.com/rkuhn/2870fcee4937dda2cad5>`_.
A lot simpler is the task of just adding an extension method to :class:`Source` as shown below:
.. includecode:: ../code/docs/stream/GraphStageDocSpec.scala#extending-source
The analog works for :class:`Flow` as well:
.. includecode:: ../code/docs/stream/GraphStageDocSpec.scala#extending-flow
If you try to write this for :class:`SubFlow`, though, you will run into the same issue as when trying to unify
the two solutions above, only on a higher level (the type constructors needed for that unification would have rank
two, meaning that some of their type arguments are type constructors themselves—when trying to extend the solution
shown in the linked sketch the author encountered such a density of compiler StackOverflowErrors and IDE failures
that he gave up).
It is interesting to note that a simplified form of this problem has found its way into the `dotty test suite <https://github.com/lampepfl/dotty/pull/1186/files>`_.
Dotty is the development version of Scala on its way to Scala 3.