From f2f3f64f21237ab0858ff7060fe7c5687d01d029 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Tue, 26 Nov 2019 17:38:23 +0000 Subject: [PATCH] Example for unfoldResourceAsync (#28080) * Example for unfoldResourceAsync Co-Authored-By: Enno <458526+ennru@users.noreply.github.com> --- .../operators/Source/unfoldResourceAsync.md | 40 +++++++++- .../main/paradox/stream/operators/index.md | 2 +- .../operators/source/UnfoldResourceAsync.java | 73 +++++++++++++++++++ .../source/UnfoldResourceAsync.scala | 55 ++++++++++++++ 4 files changed, 165 insertions(+), 5 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResourceAsync.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResourceAsync.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source/unfoldResourceAsync.md b/akka-docs/src/main/paradox/stream/operators/Source/unfoldResourceAsync.md index 75b4bbc4d2..c7afbf94dd 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/unfoldResourceAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/unfoldResourceAsync.md @@ -1,6 +1,6 @@ # Source.unfoldResourceAsync -Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. +Wrap any resource that can be opened, queried for next element and closed in an asynchronous way. @ref[Source operators](../index.md#source-operators) @@ -14,8 +14,41 @@ Wrap any resource that can be opened, queried for next element (in a blocking wa ## Description -Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. -Functions return @scala[`Future`] @java[`CompletionStage`] to achieve asynchronous processing +Wrap any resource that can be opened, queried for next element and closed in an asynchronous way with three distinct functions into a source. This operator is the equivalent of @ref[unfoldResource](unfoldResource.md) but for resources with asynchronous APIs. + +`Source.unfoldResourceAsync` allows us to safely extract stream elements from a resource with an async API by providing it with +three functions that all return a @scala[`Future`]@java[`CompletionStage`]: + +1. `create`: Open or create the resource +1. `read`: Fetch the next element or signal that we reached the end of the stream by completing the @scala[`Future`]@java[`CompletionStage`] with a @java[`Optional.empty`]@scala[`None`] +1. `close`: Close the resource, invoked on end of stream or if the stream fails + +All exceptions thrown by `create` and `close` as well as the @scala[`Future`]@java[`CompletionStage`]s completing with failure will +fail the stream. The supervision strategy is used to handle exceptions from `read`, `create` and from the @scala[`Future`]@java[`CompletionStage`]s. + +Note that there are pre-built `unfoldResourceAsync`-like operators to wrap `java.io.InputStream`s in +@ref:[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters), +`Iterator` in @ref:[fromIterator](fromIterator.md) and File IO in @ref:[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources). +Additional prebuilt technology specific connectors can also be found in the [Alpakka project](https://doc.akka.io/docs/alpakka/current/). + +## Examples + +Imagine we have an async database API which we initially perform an async query and then can +check if there are more results in an asynchronous way. + +Scala +: @@snip [UnfoldResourceAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResourceAsync.scala) { #unfoldResource-async-api } + +Java +: @@snip [UnfoldResourceAsync.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResourceAsync.java) { #unfoldResource-async-api } + +Let's see how we use the API above safely through `unfoldResourceAsync`: + +Scala +: @@snip [UnfoldResourceAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResourceAsync.scala) { #unfoldResourceAsync } + +Java +: @@snip [UnfoldResource.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResourceAsync.java) { #unfoldResourceAsync } ## Reactive Streams semantics @@ -26,4 +59,3 @@ Functions return @scala[`Future`] @java[`CompletionStage`] to achieve asynchrono **completes** when @scala[`Future`] @java[`CompletionStage`] from read function returns `None` @@@ - diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index a2b68c7e66..c0a0ec1d1f 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -43,7 +43,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].| |Source|@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].| |Source|@ref[unfoldResource](Source/unfoldResource.md)|Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.| -|Source|@ref[unfoldResourceAsync](Source/unfoldResourceAsync.md)|Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.| +|Source|@ref[unfoldResourceAsync](Source/unfoldResourceAsync.md)|Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.| |Source|@ref[zipN](Source/zipN.md)|Combine the elements of multiple streams into a stream of sequences.| |Source|@ref[zipWithN](Source/zipWithN.md)|Combine the elements of multiple streams into a stream of sequences using a combiner function.| diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResourceAsync.java b/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResourceAsync.java new file mode 100644 index 0000000000..a3f9f90866 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResourceAsync.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.Done; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.javadsl.Source; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public class UnfoldResourceAsync { + // imaginary async API we need to use + // #unfoldResource-async-api + interface Database { + // async query + CompletionStage doQuery(); + } + + interface QueryResult { + + // are there more results + CompletionStage hasMore(); + + // async retrieval of each element + CompletionStage nextEntry(); + + CompletionStage close(); + } + + interface DatabaseEntry {} + + // #unfoldResource-async-api + + void unfoldResourceExample() { + ActorSystem system = null; + + // #unfoldResourceAsync + // we don't actually have one, it was just made up for the sample + Database database = null; + + Source queryResultSource = + Source.unfoldResourceAsync( + // open + database::doQuery, + // read + this::readQueryResult, + // close + queryResult -> queryResult.close().thenApply(__ -> Done.done())); + + queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system); + // #unfoldResourceAsync + } + + // #unfoldResourceAsync + private CompletionStage> readQueryResult(QueryResult queryResult) { + return queryResult + .hasMore() + .thenCompose( + more -> { + if (more) { + return queryResult.nextEntry().thenApply(Optional::of); + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + }); + } + // #unfoldResourceAsync +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResourceAsync.scala b/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResourceAsync.scala new file mode 100644 index 0000000000..4e451fd817 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResourceAsync.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.Done +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +import scala.concurrent.Future + +object UnfoldResourceAsync { + + // imaginary async API we need to use + // #unfoldResource-async-api + trait Database { + // blocking query + def doQuery(): Future[QueryResult] + } + trait QueryResult { + def hasMore(): Future[Boolean] + def nextEntry(): Future[DatabaseEntry] + def close(): Future[Unit] + } + trait DatabaseEntry + // #unfoldResource-async-api + + def unfoldResourceExample(): Unit = { + implicit val actorSystem = ActorSystem() + implicit val ex = actorSystem.dispatcher + // #unfoldResourceAsync + // we don't actually have one, it was just made up for the sample + val database: Database = ??? + + val queryResultSource: Source[DatabaseEntry, NotUsed] = + Source.unfoldResourceAsync[DatabaseEntry, QueryResult]( + // open + () => database.doQuery(), + // read + query => + query.hasMore().flatMap { + case false => Future.successful(None) + case true => query.nextEntry().map(dbEntry => Some(dbEntry)) + }, + // close + query => query.close().map(_ => Done)) + + // process each element + queryResultSource.runForeach(println) + // #unfoldResourceAsync + } + +}