document FlowOps extension
This commit is contained in:
parent
aff158cb98
commit
5accba105d
3 changed files with 64 additions and 1 deletions
|
|
@ -519,4 +519,32 @@ class GraphStageDocSpec extends AkkaSpec {
|
|||
sub.cancel()
|
||||
}
|
||||
|
||||
"Demonstrate stream extension" when {
|
||||
|
||||
"targeting a Source" in {
|
||||
//#extending-source
|
||||
implicit class SourceDuplicator[Out, Mat](s: Source[Out, Mat]) {
|
||||
def duplicateElements: Source[Out, Mat] = s.via(new Duplicator)
|
||||
}
|
||||
|
||||
val s = Source(1 to 3).duplicateElements
|
||||
|
||||
s.runWith(Sink.seq).futureValue should ===(Seq(1, 1, 2, 2, 3, 3))
|
||||
//#extending-source
|
||||
}
|
||||
|
||||
"targeting a Flow" in {
|
||||
//#extending-flow
|
||||
implicit class FlowDuplicator[In, Out, Mat](s: Flow[In, Out, Mat]) {
|
||||
def duplicateElements: Flow[In, Out, Mat] = s.via(new Duplicator)
|
||||
}
|
||||
|
||||
val f = Flow[Int].duplicateElements
|
||||
|
||||
Source(1 to 3).via(f).runWith(Sink.seq).futureValue should ===(Seq(1, 1, 2, 2, 3, 3))
|
||||
//#extending-flow
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,8 @@ object TwitterStreamQuickstartDocSpec {
|
|||
//#model
|
||||
|
||||
//#tweet-source
|
||||
val tweets: Source[Tweet, NotUsed] //#tweet-source
|
||||
val tweets: Source[Tweet, NotUsed]
|
||||
//#tweet-source
|
||||
= Source(
|
||||
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
|
||||
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
|
||||
|
|
|
|||
|
|
@ -448,3 +448,37 @@ stage as state of an actor, and the callbacks as the ``receive`` block of the ac
|
|||
is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over**
|
||||
internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined
|
||||
behavior.
|
||||
|
||||
Extending Flow Combinators with Custom Operators
|
||||
================================================
|
||||
|
||||
The most general way of extending any :class:`Source`, :class:`Flow` or :class:`SubFlow` (e.g. from ``groupBy``) is
|
||||
demonstrated above: create a graph of flow-shape like the :class:`Duplicator` example given above and use the ``.via(...)``
|
||||
combinator to integrate it into your stream topology. This works with all :class:`FlowOps` sub-types, including the
|
||||
ports that you connect with the graph DSL.
|
||||
|
||||
Advanced Scala users may wonder whether it is possible to write extension methods that enrich :class:`FlowOps` to
|
||||
allow nicer syntax. The short answer is that Scala 2 does not support this in a fully generic fashion, the problem is
|
||||
that it is impossible to abstract over the kind of stream that is being extended because :class:`Source`, :class:`Flow`
|
||||
and :class:`SubFlow` differ in the number and kind of their type parameters. While it would be possible to write
|
||||
an implicit class that enriches them generically, this class would require explicit instantiation with all type
|
||||
parameters due to `SI-2712 <https://issues.scala-lang.org/browse/SI-2712>`_. For a partial workaround that unifies
|
||||
extensions to :class:`Source` and :class:`Flow` see `this sketch by R. Kuhn <https://gist.github.com/rkuhn/2870fcee4937dda2cad5>`_.
|
||||
|
||||
A lot simpler is the task of just adding an extension method to :class:`Source` as shown below:
|
||||
|
||||
.. includecode:: ../code/docs/stream/GraphStageDocSpec.scala#extending-source
|
||||
|
||||
The analog works for :class:`Flow` as well:
|
||||
|
||||
.. includecode:: ../code/docs/stream/GraphStageDocSpec.scala#extending-flow
|
||||
|
||||
If you try to write this for :class:`SubFlow`, though, you will run into the same issue as when trying to unify
|
||||
the two solutions above, only on a higher level (the type constructors needed for that unification would have rank
|
||||
two, meaning that some of their type arguments are type constructors themselves—when trying to extend the solution
|
||||
shown in the linked sketch the author encountered such a density of compiler StackOverflowErrors and IDE failures
|
||||
that he gave up).
|
||||
|
||||
It is interesting to note that a simplified form of this problem has found its way into the `dotty test suite <https://github.com/lampepfl/dotty/pull/1186/files>`_.
|
||||
Dotty is the development version of Scala on its way to Scala 3.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue