diff --git a/akka-docs/src/main/paradox/stream/operators/Source/unfoldAsync.md b/akka-docs/src/main/paradox/stream/operators/Source/unfoldAsync.md index 8d2d655cae..87cc13ee14 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/unfoldAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/unfoldAsync.md @@ -19,6 +19,28 @@ complete or emit when it completes. Can be used to implement many stateful sources without having to touch the more low level @ref[`GraphStage`](../../stream-customize.md) API. +## Examples + +In this example we are asking an imaginary actor for chunks of bytes from an offset with a protocol like this: + +Scala +: @@snip [UnfoldAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldAsync.scala) { #unfoldAsync-actor-protocol } + +Java +: @@snip [UnfoldAsync.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldAsync.java) { #unfoldAsync-actor-protocol } + + +The actor will reply with the `Chunk` message, if we ask for an offset outside of the end of the data the actor will respond with an empty `ByteString` + +We want to represent this as a stream of `ByteString`s that complete when we reach the end, to achieve this we use the offset as the state passed between `unfoldAsync` invocations: + +Scala +: @@snip [UnfoldAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldAsync.scala) { #unfoldAsync } + +Java +: @@snip [UnfoldAsync.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldAsync.java) { #unfoldAsync } + + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldAsync.java b/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldAsync.java new file mode 100644 index 0000000000..d2b5019907 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldAsync.java @@ -0,0 +1,75 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.NotUsed; +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.japi.Pair; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import akka.util.Timeout; +import akka.actor.typed.javadsl.AskPattern; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + +interface UnfoldAsync { + + // #unfoldAsync-actor-protocol + class DataActor { + interface Command {} + + static final class FetchChunk implements Command { + public final long offset; + public final ActorRef replyTo; + + public FetchChunk(long offset, ActorRef replyTo) { + this.offset = offset; + this.replyTo = replyTo; + } + } + + static final class Chunk { + public final ByteString bytes; + + public Chunk(ByteString bytes) { + this.bytes = bytes; + } + } + // #unfoldAsync-actor-protocol + } + + default void unfoldAsyncSample() { + ActorSystem system = null; + // #unfoldAsync + ActorRef dataActor = null; // let's say we got it from somewhere + + Duration askTimeout = Duration.ofSeconds(3); + long startOffset = 0L; + Source byteSource = + Source.unfoldAsync( + startOffset, + currentOffset -> { + // ask for next chunk + CompletionStage nextChunkCS = + AskPattern.ask( + dataActor, + (ActorRef ref) -> + new DataActor.FetchChunk(currentOffset, ref), + askTimeout, + system.scheduler()); + + return nextChunkCS.thenApply( + chunk -> { + ByteString bytes = chunk.bytes; + if (bytes.isEmpty()) return Optional.empty(); + else return Optional.of(Pair.create(currentOffset + bytes.size(), bytes)); + }); + }); + // #unfoldAsync + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldAsync.scala b/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldAsync.scala new file mode 100644 index 0000000000..c6965dbfda --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldAsync.scala @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.NotUsed +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.stream.scaladsl.Source +import akka.util.ByteString +import akka.actor.typed.scaladsl.AskPattern._ +import akka.util.Timeout + +import scala.concurrent.Future +import scala.concurrent.duration._ + +object UnfoldAsync { + + // #unfoldAsync-actor-protocol + object DataActor { + sealed trait Command + case class FetchChunk(offset: Long, replyTo: ActorRef[Chunk]) extends Command + case class Chunk(bytes: ByteString) + // #unfoldAsync-actor-protocol + + } + implicit val system: ActorSystem[Nothing] = ??? + + def unfoldAsyncExample(): Unit = { + // #unfoldAsync + // actor we can query for data with an offset + val dataActor: ActorRef[DataActor.Command] = ??? + import system.executionContext + + implicit val askTimeout: Timeout = 3.seconds + val startOffset = 0L + val byteSource: Source[ByteString, NotUsed] = + Source.unfoldAsync(startOffset) { currentOffset => + // ask for next chunk + val nextChunkFuture: Future[DataActor.Chunk] = + dataActor.ask(DataActor.FetchChunk(currentOffset, _)) + + nextChunkFuture.map { chunk => + val bytes = chunk.bytes + if (bytes.isEmpty) None // end of data + else Some((currentOffset + bytes.length, bytes)) + } + } + // #unfoldAsync + } + +}