From ac3065bfad4d2176d52eb6b85cfab3998615261f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 15 May 2020 12:03:27 +0200 Subject: [PATCH] Docs for lazy stream operators (#28897) --- .../paradox/stream/operators/Flow/lazyFlow.md | 43 +++++++-- .../paradox/stream/operators/Sink/lazySink.md | 28 ++++-- .../stream/operators/Source/lazySource.md | 46 +++++++++- .../jdocs/stream/operators/flow/Lazy.java | 87 +++++++++++++++++++ .../jdocs/stream/operators/sink/Lazy.java | 50 +++++++++++ .../jdocs/stream/operators/source/Lazy.java | 77 ++++++++++++++++ .../docs/stream/operators/flow/Lazy.scala | 69 +++++++++++++++ .../docs/stream/operators/sink/Lazy.scala | 41 +++++++++ .../docs/stream/operators/source/Lazy.scala | 56 ++++++++++++ 9 files changed, 483 insertions(+), 14 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/flow/Lazy.java create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/sink/Lazy.java create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/source/Lazy.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/flow/Lazy.scala create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sink/Lazy.scala create mode 100644 akka-docs/src/test/scala/docs/stream/operators/source/Lazy.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md index f4abc65522..950ac5d625 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md @@ -11,11 +11,45 @@ Defers creation and materialization of a `Flow` until there is a first element. ## Description -When the first element comes from upstream the actual `Flow` is created and materialized. -The internal `Flow` will not be created if there are no elements on completion or failure of up or downstream. +Defers `Flow` creation and materialization until when the first element arrives at the `lazyFlow` from upstream. After +that the stream behaves as if the nested flow replaced the `lazyFlow`. +The nested `Flow` will not be created if the outer flow completes or fails before any elements arrive. -The materialized value of the `Flow` will be the materialized value of the created internal flow if it is materialized -and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. +Note that asynchronous boundaries and many other operators in the stream may do pre-fetching or trigger demand and thereby making an early element come throught the stream leading to creation of the inner flow earlier than you would expect. + +The materialized value of the `Flow` is a @scala[`Future`]@java[`CompletionStage`] that is completed with the +materialized value of the nested flow once that is constructed. + +See also: + + * @ref:[flatMapPrefix](../Source-or-Flow/flatMapPrefix.md) + * @ref:[Flow.lazyFutureFlow](lazyFutureFlow.md) and @ref:[Flow.lazyCompletionStageFlow](lazyCompletionStageFlow.md) + * @ref:[Source.lazySource](../Source/lazySource.md) + * @ref:[Sink.lazySink](../Sink/lazySink.md) + +## Examples + +In this sample we produce a short sequence of numbers, mostly to side effect and write to standard out to see in which +order things happen. Note how producing the first value in the `Source` happens before the creation of the flow: + +Scala +: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/Lazy.scala) { #simple-example } + +Java +: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/Lazy.java) { #simple-example } + +Since the factory is called once per stream materialization it can be used to safely construct a mutable object to +use with the actual deferred `Flow`. In this example we fold elements into an `ArrayList` created inside the lazy +flow factory: + +Scala +: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/Lazy.scala) { #mutable-example } + +Java +: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/Lazy.java) { #mutable-example } + +If we instead had used `fold` directly with an `ArrayList` we would have shared the same list across +all materialization and what is even worse, unsafely across threads. ## Reactive Streams semantics @@ -34,4 +68,3 @@ and failed with a `akka.stream.NeverMaterializedException` if the stream fails o This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). @@@ - diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/lazySink.md b/akka-docs/src/main/paradox/stream/operators/Sink/lazySink.md index b3bd22bd43..7639b3c675 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/lazySink.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/lazySink.md @@ -11,15 +11,33 @@ Defers creation and materialization of a `Sink` until there is a first element. ## Description -When the first element comes from upstream the actual `Sink` is created and materialized. -The internal `Sink` will not be created if the stream completes of fails before any element got through. +Defers `Sink` creation and materialization until when the first element arrives from upstream to the `lazySink`. After +that the stream behaves as if the nested sink replaced the `lazySink`. +The nested `Sink` will not be created if upstream completes or fails without any elements arriving at the sink. -The materialized value of the `Sink` will be the materialized value of the created internal flow if it is materialized -and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. +The materialized value of the `Sink` is a @scala[`Future`]@java[`CompletionStage`] that is completed with the +materialized value of the nested sink once that is constructed. Can be combined with @ref[prefixAndTail](../Source-or-Flow/prefixAndTail.md) to base the sink on the first element. -See also @ref:[lazyFutureSink](lazyFutureSink.md) and @ref:[lazyCompletionStageSink](lazyCompletionStageSink.md). +See also: + + * @ref:[Sink.lazyFutureSink](lazyFutureSink.md) and @ref:[lazyCompletionStageSink](lazyCompletionStageSink.md). + * @ref:[Source.lazySource](../Source/lazySource.md) + * @ref:[Flow.lazyFlow](../Flow/lazyFlow.md) + +## Examples + +In this example we side effect from `Flow.map`, the sink factory and `Sink.foreach` so that the order becomes visible, +the nested sink is only created once the element has passed `map`: + +Scala +: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/sink/Lazy.scala) { #simple-example } + +Java +: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/sink/Lazy.java) { #simple-example } + + ## Reactive Streams semantics diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazySource.md b/akka-docs/src/main/paradox/stream/operators/Source/lazySource.md index 7b3e429a56..42585bb5a3 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/lazySource.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazySource.md @@ -12,12 +12,50 @@ Defers creation and materialization of a `Source` until there is demand. ## Description Defers creation and materialization of a `Source` until there is demand, then emits the elements from the source -downstream just like if it had been created up front. +downstream just like if it had been created up front. If the stream fails or cancels before there is demand the factory will not be invoked. -See also @ref:[lazyFutureSource](lazyFutureSource.md). +Note that asynchronous boundaries and many other operators in the stream may do pre-fetching or trigger demand earlier +than you would expect. -Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts -the laziness and will trigger the factory immediately. +The materialized value of the `lazy` is a @scala[`Future`]@java[`CompletionStage`] that is completed with the +materialized value of the nested source once that is constructed. + +See also: + + * @ref:[Source.lazyFutureSource](lazyFutureSource.md) and @ref:[Source.lazyCompletionStageSource](lazyCompletionStageSource.md) + * @ref:[Flow.lazyFlow](../Flow/lazyFlow.md) + * @ref:[Sink.lazySink](../Sink/lazySink.md) + +## Example + +In this example you might expect this sample to not construct the expensive source until `.pull` is called. However, +since `Sink.queue` has a buffer and will ask for that immediately on materialization the expensive source is in created +quickly after the stream has been materialized: + +Scala +: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Lazy.scala) { #not-a-good-example } + +Java +: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Lazy.java) { #not-a-good-example } + +Instead the most useful aspect of the operator is that the factory is called once per stream materialization +which means that it can be used to safely construct a mutable object to use with the actual deferred source. + +In this example we make use of that by unfolding a mutable object that works like an iterator with a method to say if +there are more elements and one that produces the next and moves to the next element. + +If the `IteratorLikeThing` was used directly in a `Source.unfold` the same instance would end up being unsafely shared +across all three materializations of the stream, but wrapping it with `Source.lazy` ensures we create a separate instance +for each of the started streams: + +Scala +: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Lazy.scala) { #one-per-materialization } + +Java +: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Lazy.java) { #one-per-materialization } + +Note though that you can often also achieve the same using @ref:[unfoldResource](unfoldResource.md). If you have an actual `Iterator` +you should prefer @ref:[fromIterator](fromIterator.md). ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/flow/Lazy.java b/akka-docs/src/test/java/jdocs/stream/operators/flow/Lazy.java new file mode 100644 index 0000000000..b503475e5c --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/flow/Lazy.java @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.flow; +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.RunnableGraph; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + +public class Lazy { + private ActorSystem system = null; + + void example() { + // #simple-example + Source numbers = + Source.unfold( + 0, + n -> { + int next = n + 1; + System.out.println("Source producing " + next); + return Optional.of(Pair.create(next, next)); + }) + .take(3); + + Flow> flow = + Flow.lazyFlow( + () -> { + System.out.println("Creating the actual flow"); + return Flow.fromFunction( + element -> { + System.out.println("Actual flow mapped " + element); + return element; + }); + }); + + numbers.via(flow).run(system); + // prints: + // Source producing 1 + // Creating the actual flow + // Actual flow mapped 1 + // Source producing 2 + // Actual flow mapped 2 + // #simple-example + } + + void statefulMap() { + // #mutable-example + Flow, CompletionStage> mutableFold = + Flow.lazyFlow( + () -> { + List zero = new ArrayList<>(); + + return Flow.of(Integer.class) + .fold( + zero, + (list, element) -> { + list.add(element); + return list; + }); + }); + + RunnableGraph stream = + Source.range(1, 3).via(mutableFold).to(Sink.foreach(System.out::println)); + + stream.run(system); + stream.run(system); + stream.run(system); + // prints: + // [1, 2, 3] + // [1, 2, 3] + // [1, 2, 3] + // #mutable-example + } +} diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sink/Lazy.java b/akka-docs/src/test/java/jdocs/stream/operators/sink/Lazy.java new file mode 100644 index 0000000000..f3b30f0f17 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sink/Lazy.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sink; +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +import akka.actor.ActorSystem; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.util.Optional; +import java.util.concurrent.CompletionStage; + +public class Lazy { + + private ActorSystem system = null; + + void example() { + // #simple-example + CompletionStage> matVal = + Source.maybe() + .map( + element -> { + System.out.println("mapped " + element); + return element; + }) + .toMat( + Sink.lazySink( + () -> { + System.out.println("Sink created"); + return Sink.foreach(elem -> System.out.println("foreach " + elem)); + }), + Keep.left()) + .run(system); + + // some time passes + // nothing has been printed + matVal.toCompletableFuture().complete(Optional.of("one")); + // now prints: + // mapped one + // Sink created + // foreach one + + // #simple-example + } +} diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/Lazy.java b/akka-docs/src/test/java/jdocs/stream/operators/source/Lazy.java new file mode 100644 index 0000000000..b8695b102f --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/Lazy.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.stream.javadsl.RunnableGraph; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.SinkQueueWithCancel; +import akka.stream.javadsl.Source; + +import java.util.Optional; +import java.util.concurrent.CompletionStage; + +public class Lazy { + + private ActorSystem system = null; + + private Source createExpensiveSource() { + throw new UnsupportedOperationException("Not implemented in sample"); + } + + void notReallyThatLazy() { + // #not-a-good-example + Source> source = + Source.lazySource( + () -> { + System.out.println("Creating the actual source"); + return createExpensiveSource(); + }); + + SinkQueueWithCancel queue = source.runWith(Sink.queue(), system); + + // ... time passes ... + // at some point in time we pull the first time + // but the source creation may already have been triggered + queue.pull(); + // #not-a-good-example + } + + static class IteratorLikeThing { + boolean thereAreMore() { + throw new UnsupportedOperationException("Not implemented in sample"); + } + + String extractNext() { + throw new UnsupportedOperationException("Not implemented in sample"); + } + } + + void safeMutableSource() { + // #one-per-materialization + RunnableGraph> stream = + Source.lazySource( + () -> { + IteratorLikeThing instance = new IteratorLikeThing(); + return Source.unfold( + instance, + sameInstance -> { + if (sameInstance.thereAreMore()) + return Optional.of(Pair.create(sameInstance, sameInstance.extractNext())); + else return Optional.empty(); + }); + }) + .to(Sink.foreach(System.out::println)); + + // each of the three materializations will have their own instance of IteratorLikeThing + stream.run(system); + stream.run(system); + stream.run(system); + // #one-per-materialization + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/flow/Lazy.scala b/akka-docs/src/test/scala/docs/stream/operators/flow/Lazy.scala new file mode 100644 index 0000000000..a619b03418 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/flow/Lazy.scala @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package docs.stream.operators.flow + +import java.util + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +object Lazy { + + implicit val system: ActorSystem = ??? + + def example(): Unit = { + // #simple-example + val numbers = Source + .unfold(0) { n => + val next = n + 1 + println(s"Source producing $next") + Some((next, next)) + } + .take(3) + + val flow = Flow.lazyFlow { () => + println("Creating the actual flow") + Flow[Int].map { element => + println(s"Actual flow mapped $element") + element + } + } + + numbers.via(flow).run() + // prints: + // Source producing 1 + // Creating the actual flow + // Actual flow mapped 1 + // Source producing 2 + // Actual flow mapped 2 + // #simple-example + } + + def statefulMap(): Unit = { + // #mutable-example + val mutableFold = Flow.lazyFlow { () => + val zero = new util.ArrayList[Int]() + Flow[Int].fold(zero) { (list, element) => + list.add(element) + list + } + } + val stream = + Source(1 to 3).via(mutableFold).to(Sink.foreach(println)) + + stream.run() + stream.run() + stream.run() + // prints: + // [1, 2, 3] + // [1, 2, 3] + // [1, 2, 3] + + // #mutable-example + } + +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sink/Lazy.scala b/akka-docs/src/test/scala/docs/stream/operators/sink/Lazy.scala new file mode 100644 index 0000000000..6587325cbc --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sink/Lazy.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package docs.stream.operators.sink + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +object Lazy { + + implicit val system: ActorSystem = ??? + + def example(): Unit = { + // #simple-example + val matVal = + Source + .maybe[String] + .map { element => + println(s"mapped $element") + element + } + .toMat(Sink.lazySink { () => + println("Sink created") + Sink.foreach(elem => println(s"foreach $elem")) + })(Keep.left) + .run() + + // some time passes + // nothing has been printed + matVal.success(Some("one")) + // now prints: + // mapped one + // Sink created + // foreach one + + // #simple-example + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/Lazy.scala b/akka-docs/src/test/scala/docs/stream/operators/source/Lazy.scala new file mode 100644 index 0000000000..568ce3829e --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/Lazy.scala @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +object Lazy { + + implicit val system: ActorSystem = ??? + + def createExpensiveSource(): Source[String, NotUsed] = ??? + + def notReallyThatLazy(): Unit = { + // #not-a-good-example + val source = Source.lazySource { () => + println("Creating the actual source") + createExpensiveSource() + } + + val queue = source.runWith(Sink.queue()) + + // ... time passes ... + // at some point in time we pull the first time + // but the source creation may already have been triggered + queue.pull() + // #not-a-good-example + } + + class IteratorLikeThing { + def thereAreMore: Boolean = ??? + def extractNext: String = ??? + } + def safeMutableSource(): Unit = { + // #one-per-materialization + val stream = Source + .lazySource { () => + val iteratorLike = new IteratorLikeThing + Source.unfold(iteratorLike) { iteratorLike => + if (iteratorLike.thereAreMore) Some((iteratorLike, iteratorLike.extractNext)) + else None + } + } + .to(Sink.foreach(println)) + + // each of the three materializations will have their own instance of IteratorLikeThing + stream.run() + stream.run() + stream.run() + // #one-per-materialization + } +}