diff --git a/akka-docs/src/main/paradox/stream/stream-cookbook.md b/akka-docs/src/main/paradox/stream/stream-cookbook.md index 173efd7571..3adc6682af 100644 --- a/akka-docs/src/main/paradox/stream/stream-cookbook.md +++ b/akka-docs/src/main/paradox/stream/stream-cookbook.md @@ -40,6 +40,22 @@ Scala Java : @@snip [RecipeLoggingElements.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeLoggingElements.java) { #log-custom } +### Creating a source that continuously evaluates a function + +**Situation:** A source is required that continuously provides elements obtained by evaluating a given function, so long as there is demand. + +The simplest implementation is to use a `Source.repeat` that produces some arbitrary element - e.g. `NotUsed` - +and then map those elements to the function evaluation. E.g. if we have some `builderFunction()`, we can use: + +Scala +: @@snip [RecipeSourceFromFunction.scala]($code$/scala/docs/stream/cookbook/RecipeSourceFromFunction.scala) { #source-from-function } + +Java +: @@snip [RecipeSourceFromFunction.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeSourceFromFunction.java) { #source-from-function } + +Note: if the element-builder function touches mutable state, then a guaranteed single-threaded source should be used +instead; e.g. `Source.unfold` or `Source.unfoldResource`. + ### Flattening a stream of sequences **Situation:** A stream is given as a stream of sequence of elements, but a stream of elements needed instead, streaming diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSourceFromFunction.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSourceFromFunction.java new file mode 100644 index 0000000000..7c621156e4 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeSourceFromFunction.java @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package jdocs.stream.javadsl.cookbook; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.testkit.javadsl.TestKit; +import com.typesafe.config.ConfigFactory; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class RecipeSourceFromFunction extends RecipeTest { + static ActorSystem system; + static Materializer mat; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("RecipeSourceFromFunction", ConfigFactory.parseString("akka.loglevel=DEBUG\nakka.loggers = [akka.testkit.TestEventListener]")); + mat = ActorMaterializer.create(system); + } + + @AfterClass + public static void tearDown() { + TestKit.shutdownActorSystem(system); + system = null; + mat = null; + } + + @Test + public void beMappingOfRepeat() throws Exception { + new TestKit(system) { + final String builderFunction() { + return UUID.randomUUID().toString(); + } + + { + //#source-from-function + final Source source = Source + .repeat(NotUsed.getInstance()) + .map(elem -> builderFunction()); + //#source-from-function + + final List result = source + .take(2) + .runWith(Sink.seq(), mat) + .toCompletableFuture().get(3, TimeUnit.SECONDS); + + Assert.assertEquals(2, result.size()); + } + }; + } +} diff --git a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSourceFromFunction.scala b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSourceFromFunction.scala new file mode 100644 index 0000000000..5526258d89 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeSourceFromFunction.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package docs.stream.cookbook + +import java.util.UUID + +import akka.NotUsed +import akka.stream.scaladsl._ + +class RecipeSourceFromFunction extends RecipeSpec { + + "A source that repeatedly evaluates a function" must { + + "be a mapping of Source.repeat" in { + def builderFunction(): String = UUID.randomUUID.toString + + //#source-from-function + val source = Source.repeat(NotUsed).map(_ ⇒ builderFunction()) + //#source-from-function + + val f = source.take(2).runWith(Sink.seq) + f.futureValue.distinct.size should ===(2) + } + } +}