Example for unfoldResourceAsync (#28080)

* Example for unfoldResourceAsync

Co-Authored-By: Enno <458526+ennru@users.noreply.github.com>
This commit is contained in:
Christopher Batey 2019-11-26 17:38:23 +00:00 committed by Arnout Engelen
parent 5f21c2264b
commit f2f3f64f21
4 changed files with 165 additions and 5 deletions

View file

@ -1,6 +1,6 @@
# Source.unfoldResourceAsync # Source.unfoldResourceAsync
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.
@ref[Source operators](../index.md#source-operators) @ref[Source operators](../index.md#source-operators)
@ -14,8 +14,41 @@ Wrap any resource that can be opened, queried for next element (in a blocking wa
## Description ## Description
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. Wrap any resource that can be opened, queried for next element and closed in an asynchronous way with three distinct functions into a source. This operator is the equivalent of @ref[unfoldResource](unfoldResource.md) but for resources with asynchronous APIs.
Functions return @scala[`Future`] @java[`CompletionStage`] to achieve asynchronous processing
`Source.unfoldResourceAsync` allows us to safely extract stream elements from a resource with an async API by providing it with
three functions that all return a @scala[`Future`]@java[`CompletionStage`]:
1. `create`: Open or create the resource
1. `read`: Fetch the next element or signal that we reached the end of the stream by completing the @scala[`Future`]@java[`CompletionStage`] with a @java[`Optional.empty`]@scala[`None`]
1. `close`: Close the resource, invoked on end of stream or if the stream fails
All exceptions thrown by `create` and `close` as well as the @scala[`Future`]@java[`CompletionStage`]s completing with failure will
fail the stream. The supervision strategy is used to handle exceptions from `read`, `create` and from the @scala[`Future`]@java[`CompletionStage`]s.
Note that there are pre-built `unfoldResourceAsync`-like operators to wrap `java.io.InputStream`s in
@ref:[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters),
`Iterator` in @ref:[fromIterator](fromIterator.md) and File IO in @ref:[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources).
Additional prebuilt technology specific connectors can also be found in the [Alpakka project](https://doc.akka.io/docs/alpakka/current/).
## Examples
Imagine we have an async database API which we initially perform an async query and then can
check if there are more results in an asynchronous way.
Scala
: @@snip [UnfoldResourceAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResourceAsync.scala) { #unfoldResource-async-api }
Java
: @@snip [UnfoldResourceAsync.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResourceAsync.java) { #unfoldResource-async-api }
Let's see how we use the API above safely through `unfoldResourceAsync`:
Scala
: @@snip [UnfoldResourceAsync.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResourceAsync.scala) { #unfoldResourceAsync }
Java
: @@snip [UnfoldResource.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResourceAsync.java) { #unfoldResourceAsync }
## Reactive Streams semantics ## Reactive Streams semantics
@ -26,4 +59,3 @@ Functions return @scala[`Future`] @java[`CompletionStage`] to achieve asynchrono
**completes** when @scala[`Future`] @java[`CompletionStage`] from read function returns `None` **completes** when @scala[`Future`] @java[`CompletionStage`] from read function returns `None`
@@@ @@@

View file

@ -43,7 +43,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|<a name="unfold"></a>@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].| |Source|<a name="unfold"></a>@ref[unfold](Source/unfold.md)|Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`].|
|Source|<a name="unfoldasync"></a>@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].| |Source|<a name="unfoldasync"></a>@ref[unfoldAsync](Source/unfoldAsync.md)|Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`].|
|Source|<a name="unfoldresource"></a>@ref[unfoldResource](Source/unfoldResource.md)|Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.| |Source|<a name="unfoldresource"></a>@ref[unfoldResource](Source/unfoldResource.md)|Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.|
|Source|<a name="unfoldresourceasync"></a>@ref[unfoldResourceAsync](Source/unfoldResourceAsync.md)|Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.| |Source|<a name="unfoldresourceasync"></a>@ref[unfoldResourceAsync](Source/unfoldResourceAsync.md)|Wrap any resource that can be opened, queried for next element and closed in an asynchronous way.|
|Source|<a name="zipn"></a>@ref[zipN](Source/zipN.md)|Combine the elements of multiple streams into a stream of sequences.| |Source|<a name="zipn"></a>@ref[zipN](Source/zipN.md)|Combine the elements of multiple streams into a stream of sequences.|
|Source|<a name="zipwithn"></a>@ref[zipWithN](Source/zipWithN.md)|Combine the elements of multiple streams into a stream of sequences using a combiner function.| |Source|<a name="zipwithn"></a>@ref[zipWithN](Source/zipWithN.md)|Combine the elements of multiple streams into a stream of sequences using a combiner function.|

View file

@ -0,0 +1,73 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.source;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class UnfoldResourceAsync {
// imaginary async API we need to use
// #unfoldResource-async-api
interface Database {
// async query
CompletionStage<QueryResult> doQuery();
}
interface QueryResult {
// are there more results
CompletionStage<Boolean> hasMore();
// async retrieval of each element
CompletionStage<DatabaseEntry> nextEntry();
CompletionStage<Void> close();
}
interface DatabaseEntry {}
// #unfoldResource-async-api
void unfoldResourceExample() {
ActorSystem system = null;
// #unfoldResourceAsync
// we don't actually have one, it was just made up for the sample
Database database = null;
Source<DatabaseEntry, NotUsed> queryResultSource =
Source.unfoldResourceAsync(
// open
database::doQuery,
// read
this::readQueryResult,
// close
queryResult -> queryResult.close().thenApply(__ -> Done.done()));
queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system);
// #unfoldResourceAsync
}
// #unfoldResourceAsync
private CompletionStage<Optional<DatabaseEntry>> readQueryResult(QueryResult queryResult) {
return queryResult
.hasMore()
.thenCompose(
more -> {
if (more) {
return queryResult.nextEntry().thenApply(Optional::of);
} else {
return CompletableFuture.completedFuture(Optional.empty());
}
});
}
// #unfoldResourceAsync
}

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.source
import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import scala.concurrent.Future
object UnfoldResourceAsync {
// imaginary async API we need to use
// #unfoldResource-async-api
trait Database {
// blocking query
def doQuery(): Future[QueryResult]
}
trait QueryResult {
def hasMore(): Future[Boolean]
def nextEntry(): Future[DatabaseEntry]
def close(): Future[Unit]
}
trait DatabaseEntry
// #unfoldResource-async-api
def unfoldResourceExample(): Unit = {
implicit val actorSystem = ActorSystem()
implicit val ex = actorSystem.dispatcher
// #unfoldResourceAsync
// we don't actually have one, it was just made up for the sample
val database: Database = ???
val queryResultSource: Source[DatabaseEntry, NotUsed] =
Source.unfoldResourceAsync[DatabaseEntry, QueryResult](
// open
() => database.doQuery(),
// read
query =>
query.hasMore().flatMap {
case false => Future.successful(None)
case true => query.nextEntry().map(dbEntry => Some(dbEntry))
},
// close
query => query.close().map(_ => Done))
// process each element
queryResultSource.runForeach(println)
// #unfoldResourceAsync
}
}