Docs for lazy stream operators (#28897)

This commit is contained in:
Johan Andrén 2020-05-15 12:03:27 +02:00 committed by GitHub
parent 5dc56a6d8c
commit ac3065bfad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 483 additions and 14 deletions

View file

@ -11,11 +11,45 @@ Defers creation and materialization of a `Flow` until there is a first element.
## Description ## Description
When the first element comes from upstream the actual `Flow` is created and materialized. Defers `Flow` creation and materialization until when the first element arrives at the `lazyFlow` from upstream. After
The internal `Flow` will not be created if there are no elements on completion or failure of up or downstream. that the stream behaves as if the nested flow replaced the `lazyFlow`.
The nested `Flow` will not be created if the outer flow completes or fails before any elements arrive.
The materialized value of the `Flow` will be the materialized value of the created internal flow if it is materialized Note that asynchronous boundaries and many other operators in the stream may do pre-fetching or trigger demand and thereby making an early element come throught the stream leading to creation of the inner flow earlier than you would expect.
and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized.
The materialized value of the `Flow` is a @scala[`Future`]@java[`CompletionStage`] that is completed with the
materialized value of the nested flow once that is constructed.
See also:
* @ref:[flatMapPrefix](../Source-or-Flow/flatMapPrefix.md)
* @ref:[Flow.lazyFutureFlow](lazyFutureFlow.md) and @ref:[Flow.lazyCompletionStageFlow](lazyCompletionStageFlow.md)
* @ref:[Source.lazySource](../Source/lazySource.md)
* @ref:[Sink.lazySink](../Sink/lazySink.md)
## Examples
In this sample we produce a short sequence of numbers, mostly to side effect and write to standard out to see in which
order things happen. Note how producing the first value in the `Source` happens before the creation of the flow:
Scala
: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/Lazy.scala) { #simple-example }
Java
: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/Lazy.java) { #simple-example }
Since the factory is called once per stream materialization it can be used to safely construct a mutable object to
use with the actual deferred `Flow`. In this example we fold elements into an `ArrayList` created inside the lazy
flow factory:
Scala
: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/Lazy.scala) { #mutable-example }
Java
: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/Lazy.java) { #mutable-example }
If we instead had used `fold` directly with an `ArrayList` we would have shared the same list across
all materialization and what is even worse, unsafely across threads.
## Reactive Streams semantics ## Reactive Streams semantics
@ -34,4 +68,3 @@ and failed with a `akka.stream.NeverMaterializedException` if the stream fails o
This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute, This behaviour can be controlled by setting the [[akka.stream.Attributes.NestedMaterializationCancellationPolicy.PropagateToNested]] attribute,
this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause). this will delay downstream cancellation until nested flow's materialization which is then immediately cancelled (with the original cancellation cause).
@@@ @@@

View file

@ -11,15 +11,33 @@ Defers creation and materialization of a `Sink` until there is a first element.
## Description ## Description
When the first element comes from upstream the actual `Sink` is created and materialized. Defers `Sink` creation and materialization until when the first element arrives from upstream to the `lazySink`. After
The internal `Sink` will not be created if the stream completes of fails before any element got through. that the stream behaves as if the nested sink replaced the `lazySink`.
The nested `Sink` will not be created if upstream completes or fails without any elements arriving at the sink.
The materialized value of the `Sink` will be the materialized value of the created internal flow if it is materialized The materialized value of the `Sink` is a @scala[`Future`]@java[`CompletionStage`] that is completed with the
and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. materialized value of the nested sink once that is constructed.
Can be combined with @ref[prefixAndTail](../Source-or-Flow/prefixAndTail.md) to base the sink on the first element. Can be combined with @ref[prefixAndTail](../Source-or-Flow/prefixAndTail.md) to base the sink on the first element.
See also @ref:[lazyFutureSink](lazyFutureSink.md) and @ref:[lazyCompletionStageSink](lazyCompletionStageSink.md). See also:
* @ref:[Sink.lazyFutureSink](lazyFutureSink.md) and @ref:[lazyCompletionStageSink](lazyCompletionStageSink.md).
* @ref:[Source.lazySource](../Source/lazySource.md)
* @ref:[Flow.lazyFlow](../Flow/lazyFlow.md)
## Examples
In this example we side effect from `Flow.map`, the sink factory and `Sink.foreach` so that the order becomes visible,
the nested sink is only created once the element has passed `map`:
Scala
: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/sink/Lazy.scala) { #simple-example }
Java
: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/sink/Lazy.java) { #simple-example }
## Reactive Streams semantics ## Reactive Streams semantics

View file

@ -12,12 +12,50 @@ Defers creation and materialization of a `Source` until there is demand.
## Description ## Description
Defers creation and materialization of a `Source` until there is demand, then emits the elements from the source Defers creation and materialization of a `Source` until there is demand, then emits the elements from the source
downstream just like if it had been created up front. downstream just like if it had been created up front. If the stream fails or cancels before there is demand the factory will not be invoked.
See also @ref:[lazyFutureSource](lazyFutureSource.md). Note that asynchronous boundaries and many other operators in the stream may do pre-fetching or trigger demand earlier
than you would expect.
Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts The materialized value of the `lazy` is a @scala[`Future`]@java[`CompletionStage`] that is completed with the
the laziness and will trigger the factory immediately. materialized value of the nested source once that is constructed.
See also:
* @ref:[Source.lazyFutureSource](lazyFutureSource.md) and @ref:[Source.lazyCompletionStageSource](lazyCompletionStageSource.md)
* @ref:[Flow.lazyFlow](../Flow/lazyFlow.md)
* @ref:[Sink.lazySink](../Sink/lazySink.md)
## Example
In this example you might expect this sample to not construct the expensive source until `.pull` is called. However,
since `Sink.queue` has a buffer and will ask for that immediately on materialization the expensive source is in created
quickly after the stream has been materialized:
Scala
: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Lazy.scala) { #not-a-good-example }
Java
: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Lazy.java) { #not-a-good-example }
Instead the most useful aspect of the operator is that the factory is called once per stream materialization
which means that it can be used to safely construct a mutable object to use with the actual deferred source.
In this example we make use of that by unfolding a mutable object that works like an iterator with a method to say if
there are more elements and one that produces the next and moves to the next element.
If the `IteratorLikeThing` was used directly in a `Source.unfold` the same instance would end up being unsafely shared
across all three materializations of the stream, but wrapping it with `Source.lazy` ensures we create a separate instance
for each of the started streams:
Scala
: @@snip [Lazy.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Lazy.scala) { #one-per-materialization }
Java
: @@snip [Lazy.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Lazy.java) { #one-per-materialization }
Note though that you can often also achieve the same using @ref:[unfoldResource](unfoldResource.md). If you have an actual `Iterator`
you should prefer @ref:[fromIterator](fromIterator.md).
## Reactive Streams semantics ## Reactive Streams semantics

View file

@ -0,0 +1,87 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.flow;
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
public class Lazy {
private ActorSystem system = null;
void example() {
// #simple-example
Source<Integer, NotUsed> numbers =
Source.unfold(
0,
n -> {
int next = n + 1;
System.out.println("Source producing " + next);
return Optional.of(Pair.create(next, next));
})
.take(3);
Flow<Integer, Integer, CompletionStage<NotUsed>> flow =
Flow.lazyFlow(
() -> {
System.out.println("Creating the actual flow");
return Flow.fromFunction(
element -> {
System.out.println("Actual flow mapped " + element);
return element;
});
});
numbers.via(flow).run(system);
// prints:
// Source producing 1
// Creating the actual flow
// Actual flow mapped 1
// Source producing 2
// Actual flow mapped 2
// #simple-example
}
void statefulMap() {
// #mutable-example
Flow<Integer, List<Integer>, CompletionStage<NotUsed>> mutableFold =
Flow.lazyFlow(
() -> {
List<Integer> zero = new ArrayList<>();
return Flow.of(Integer.class)
.fold(
zero,
(list, element) -> {
list.add(element);
return list;
});
});
RunnableGraph<NotUsed> stream =
Source.range(1, 3).via(mutableFold).to(Sink.foreach(System.out::println));
stream.run(system);
stream.run(system);
stream.run(system);
// prints:
// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]
// #mutable-example
}
}

View file

@ -0,0 +1,50 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sink;
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
import akka.actor.ActorSystem;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
public class Lazy {
private ActorSystem system = null;
void example() {
// #simple-example
CompletionStage<Optional<String>> matVal =
Source.<String>maybe()
.map(
element -> {
System.out.println("mapped " + element);
return element;
})
.toMat(
Sink.lazySink(
() -> {
System.out.println("Sink created");
return Sink.foreach(elem -> System.out.println("foreach " + elem));
}),
Keep.left())
.run(system);
// some time passes
// nothing has been printed
matVal.toCompletableFuture().complete(Optional.of("one"));
// now prints:
// mapped one
// Sink created
// foreach one
// #simple-example
}
}

View file

@ -0,0 +1,77 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.source;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.SinkQueueWithCancel;
import akka.stream.javadsl.Source;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
public class Lazy {
private ActorSystem system = null;
private Source<String, NotUsed> createExpensiveSource() {
throw new UnsupportedOperationException("Not implemented in sample");
}
void notReallyThatLazy() {
// #not-a-good-example
Source<String, CompletionStage<NotUsed>> source =
Source.lazySource(
() -> {
System.out.println("Creating the actual source");
return createExpensiveSource();
});
SinkQueueWithCancel<String> queue = source.runWith(Sink.queue(), system);
// ... time passes ...
// at some point in time we pull the first time
// but the source creation may already have been triggered
queue.pull();
// #not-a-good-example
}
static class IteratorLikeThing {
boolean thereAreMore() {
throw new UnsupportedOperationException("Not implemented in sample");
}
String extractNext() {
throw new UnsupportedOperationException("Not implemented in sample");
}
}
void safeMutableSource() {
// #one-per-materialization
RunnableGraph<CompletionStage<NotUsed>> stream =
Source.lazySource(
() -> {
IteratorLikeThing instance = new IteratorLikeThing();
return Source.unfold(
instance,
sameInstance -> {
if (sameInstance.thereAreMore())
return Optional.of(Pair.create(sameInstance, sameInstance.extractNext()));
else return Optional.empty();
});
})
.to(Sink.foreach(System.out::println));
// each of the three materializations will have their own instance of IteratorLikeThing
stream.run(system);
stream.run(system);
stream.run(system);
// #one-per-materialization
}
}

View file

@ -0,0 +1,69 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.flow
import java.util
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object Lazy {
implicit val system: ActorSystem = ???
def example(): Unit = {
// #simple-example
val numbers = Source
.unfold(0) { n =>
val next = n + 1
println(s"Source producing $next")
Some((next, next))
}
.take(3)
val flow = Flow.lazyFlow { () =>
println("Creating the actual flow")
Flow[Int].map { element =>
println(s"Actual flow mapped $element")
element
}
}
numbers.via(flow).run()
// prints:
// Source producing 1
// Creating the actual flow
// Actual flow mapped 1
// Source producing 2
// Actual flow mapped 2
// #simple-example
}
def statefulMap(): Unit = {
// #mutable-example
val mutableFold = Flow.lazyFlow { () =>
val zero = new util.ArrayList[Int]()
Flow[Int].fold(zero) { (list, element) =>
list.add(element)
list
}
}
val stream =
Source(1 to 3).via(mutableFold).to(Sink.foreach(println))
stream.run()
stream.run()
stream.run()
// prints:
// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]
// #mutable-example
}
}

View file

@ -0,0 +1,41 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sink
import akka.actor.ActorSystem
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object Lazy {
implicit val system: ActorSystem = ???
def example(): Unit = {
// #simple-example
val matVal =
Source
.maybe[String]
.map { element =>
println(s"mapped $element")
element
}
.toMat(Sink.lazySink { () =>
println("Sink created")
Sink.foreach(elem => println(s"foreach $elem"))
})(Keep.left)
.run()
// some time passes
// nothing has been printed
matVal.success(Some("one"))
// now prints:
// mapped one
// Sink created
// foreach one
// #simple-example
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.source
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object Lazy {
implicit val system: ActorSystem = ???
def createExpensiveSource(): Source[String, NotUsed] = ???
def notReallyThatLazy(): Unit = {
// #not-a-good-example
val source = Source.lazySource { () =>
println("Creating the actual source")
createExpensiveSource()
}
val queue = source.runWith(Sink.queue())
// ... time passes ...
// at some point in time we pull the first time
// but the source creation may already have been triggered
queue.pull()
// #not-a-good-example
}
class IteratorLikeThing {
def thereAreMore: Boolean = ???
def extractNext: String = ???
}
def safeMutableSource(): Unit = {
// #one-per-materialization
val stream = Source
.lazySource { () =>
val iteratorLike = new IteratorLikeThing
Source.unfold(iteratorLike) { iteratorLike =>
if (iteratorLike.thereAreMore) Some((iteratorLike, iteratorLike.extractNext))
else None
}
}
.to(Sink.foreach(println))
// each of the three materializations will have their own instance of IteratorLikeThing
stream.run()
stream.run()
stream.run()
// #one-per-materialization
}
}