diff --git a/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala index fee76a9bcb..0defd9a17e 100644 --- a/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala @@ -519,4 +519,32 @@ class GraphStageDocSpec extends AkkaSpec { sub.cancel() } + "Demonstrate stream extension" when { + + "targeting a Source" in { + //#extending-source + implicit class SourceDuplicator[Out, Mat](s: Source[Out, Mat]) { + def duplicateElements: Source[Out, Mat] = s.via(new Duplicator) + } + + val s = Source(1 to 3).duplicateElements + + s.runWith(Sink.seq).futureValue should ===(Seq(1, 1, 2, 2, 3, 3)) + //#extending-source + } + + "targeting a Flow" in { + //#extending-flow + implicit class FlowDuplicator[In, Out, Mat](s: Flow[In, Out, Mat]) { + def duplicateElements: Flow[In, Out, Mat] = s.via(new Duplicator) + } + + val f = Flow[Int].duplicateElements + + Source(1 to 3).via(f).runWith(Sink.seq).futureValue should ===(Seq(1, 1, 2, 2, 3, 3)) + //#extending-flow + } + + } + } diff --git a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index edf4828aa8..818368a044 100644 --- a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -32,7 +32,8 @@ object TwitterStreamQuickstartDocSpec { //#model //#tweet-source - val tweets: Source[Tweet, NotUsed] //#tweet-source + val tweets: Source[Tweet, NotUsed] + //#tweet-source = Source( Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") :: Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") :: diff --git a/akka-docs/rst/scala/stream/stream-customize.rst b/akka-docs/rst/scala/stream/stream-customize.rst index cfe05cb301..b21978c932 100644 --- a/akka-docs/rst/scala/stream/stream-customize.rst +++ b/akka-docs/rst/scala/stream/stream-customize.rst @@ -448,3 +448,37 @@ stage as state of an actor, and the callbacks as the ``receive`` block of the ac is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over** internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined behavior. + +Extending Flow Combinators with Custom Operators +================================================ + +The most general way of extending any :class:`Source`, :class:`Flow` or :class:`SubFlow` (e.g. from ``groupBy``) is +demonstrated above: create a graph of flow-shape like the :class:`Duplicator` example given above and use the ``.via(...)`` +combinator to integrate it into your stream topology. This works with all :class:`FlowOps` sub-types, including the +ports that you connect with the graph DSL. + +Advanced Scala users may wonder whether it is possible to write extension methods that enrich :class:`FlowOps` to +allow nicer syntax. The short answer is that Scala 2 does not support this in a fully generic fashion, the problem is +that it is impossible to abstract over the kind of stream that is being extended because :class:`Source`, :class:`Flow` +and :class:`SubFlow` differ in the number and kind of their type parameters. While it would be possible to write +an implicit class that enriches them generically, this class would require explicit instantiation with all type +parameters due to `SI-2712 `_. For a partial workaround that unifies +extensions to :class:`Source` and :class:`Flow` see `this sketch by R. Kuhn `_. + +A lot simpler is the task of just adding an extension method to :class:`Source` as shown below: + +.. includecode:: ../code/docs/stream/GraphStageDocSpec.scala#extending-source + +The analog works for :class:`Flow` as well: + +.. includecode:: ../code/docs/stream/GraphStageDocSpec.scala#extending-flow + +If you try to write this for :class:`SubFlow`, though, you will run into the same issue as when trying to unify +the two solutions above, only on a higher level (the type constructors needed for that unification would have rank +two, meaning that some of their type arguments are type constructors themselves—when trying to extend the solution +shown in the linked sketch the author encountered such a density of compiler StackOverflowErrors and IDE failures +that he gave up). + +It is interesting to note that a simplified form of this problem has found its way into the `dotty test suite `_. +Dotty is the development version of Scala on its way to Scala 3. +