diff --git a/akka-docs/src/main/paradox/stream/operators/Source/unfold.md b/akka-docs/src/main/paradox/stream/operators/Source/unfold.md index 8863d9cde6..a2640d1d8c 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/unfold.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/unfold.md @@ -1,24 +1,48 @@ # Source.unfold -Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`]. +Stream the result of a function as long as it returns a @scala[`Some`] @java[non empty `Optional`]. @ref[Source operators](../index.md#source-operators) -@@@div { .group-scala } - ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #unfold } +Scala +: @@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #unfold } -@@@ +Java +: @@snip [SourceUnfoldTest.java](/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceUnfoldTest.java) { #signature } ## Description -Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`]. The value inside the option -consists of a @scala[tuple] @java[pair] where the first value is a state passed back into the next call to the function allowing -to pass a state. The first invocation of the provided fold function will receive the `zero` state. +Stream the result of a function as long as it returns a @scala[`Some`] @java[non empty `Optional`]. The value inside the option consists of a @scala[tuple] @java[pair] where the first value is a state passed back into the next call to the function allowing to pass a state. The first invocation of the provided fold function will receive the `zero` state. + +@@@ warning + +The same `zero` state object will be used for every materialization of the `Source` so it is **mandatory** that the state is immutable. For example a `java.util.Iterator`, `Array` or Java standard library collection would not be safe as the fold operation could mutate the value. If you must use a mutable value, combining with @ref:[Source.lazySource](lazySource.md) to make sure a new mutable `zero` value is created for each materialization is one solution. + +@@@ + +Note that for unfolding a source of elements through a blocking API, such as a network or filesystem resource you should prefer using @ref:[unfoldResource](unfoldResource.md). + +## Examples + +This first sample starts at a user provided integer and counts down to zero using `unfold` : + +Scala + : @@snip [Unfold.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Unfold.scala) { #countdown } + +Java + : @@snip [Unfold.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Unfold.java) { #countdown } + + +It is also possible to express unfolds that don't have an end, which will never return @scala[`None`] @java[`Optional.empty`] and must be combined with for example `.take(n)` to not produce infinite streams. Here we have implemented the Fibonacci numbers (0, 1, 1, 2, 3, 5, 8, 13, etc) with `unfold`: + +Scala + : @@snip [Unfold.scala](/akka-docs/src/test/scala/docs/stream/operators/source/Unfold.scala) { #fibonacci } + +Java + : @@snip [Unfold.java](/akka-docs/src/test/java/jdocs/stream/operators/source/Unfold.java) { #fibonacci } -Can be used to implement many stateful sources without having to touch the more low level @ref[`GraphStage`](../../stream-customize.md) API. ## Reactive Streams semantics diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 7e8c01eab4..370cacdfac 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -40,7 +40,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly| |Source|@ref[single](Source/single.md)|Stream a single object| |Source|@ref[tick](Source/tick.md)|A periodical repetition of an arbitrary object.| -|Source|@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].| +|Source|@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[non empty `Optional`].| |Source|@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].| |Source|@ref[unfoldResource](Source/unfoldResource.md)|Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.| |Source|@ref[unfoldResourceAsync](Source/unfoldResourceAsync.md)|Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.| diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/Unfold.java b/akka-docs/src/test/java/jdocs/stream/operators/source/Unfold.java new file mode 100644 index 0000000000..f14c998bc8 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/Unfold.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.javadsl.Source; + +import java.math.BigInteger; +import java.util.Optional; + +interface Unfold { + + // #countdown + public static Source countDown(Integer from) { + return Source.unfold( + from, + current -> { + if (current == 0) return Optional.empty(); + else return Optional.of(Pair.create(current - 1, current)); + }); + } + // #countdown + + // #fibonacci + public static Source fibonacci() { + return Source.unfold( + Pair.create(BigInteger.ZERO, BigInteger.ONE), + current -> { + BigInteger a = current.first(); + BigInteger b = current.second(); + Pair next = Pair.create(b, a.add(b)); + return Optional.of(Pair.create(next, a)); + }); + } + // #fibonacci + +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/Unfold.scala b/akka-docs/src/test/scala/docs/stream/operators/source/Unfold.scala new file mode 100644 index 0000000000..fedb86095c --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/Unfold.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.NotUsed +import akka.stream.scaladsl.Source + +object Unfold { + + // #countdown + def countDown(from: Int): Source[Int, NotUsed] = + Source.unfold(from) { current => + if (current == 0) None + else Some((current - 1, current)) + } + // #countdown + + // #fibonacci + def fibonacci: Source[BigInt, NotUsed] = + Source.unfold((BigInt(0), BigInt(1))) { + case (a, b) => + Some(((b, a + b), a)) + } + // #fibonacci + +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceUnfoldTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceUnfoldTest.java new file mode 100644 index 0000000000..87065bc35d --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceUnfoldTest.java @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.japi.function.Function; + +import java.util.Optional; + +public class SourceUnfoldTest { + + public static // #signature + Source unfold(S zero, Function>> f) + // #signature + { + return Source.unfold(zero, f); + } +}