Add a sample for Source.unfoldResource #25468

This commit is contained in:
Johan Andrén 2019-10-05 12:28:07 +02:00 committed by GitHub
parent 023b379e3c
commit 7102c4744d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 153 additions and 4 deletions

View file

@ -14,15 +14,53 @@ Wrap any resource that can be opened, queried for next element (in a blocking wa
## Description
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
`Source.unfoldResource` allows us to safely extract stream elements from blocking resources by providing it with three functions:
1. `create`: Open or create the resource
1. `read`: Fetch the next element or signal that we reached the end of the stream by returning a @java[`Optional.empty`]@scala[`None`]
1. `close`: Close the resource, invoked on end of stream or if the stream fails
The functions are by default called on Akka's dispatcher for blocking IO to avoid interfering with other stream operations.
See @ref:[Blocking Needs Careful Management](../../../typed/dispatchers.md#blocking-needs-careful-management) for an explanation on why this is important.
Note that there are pre-built `unfoldResource`-like operators to wrap `java.io.InputStream`s in
@ref:[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters),
`Iterator` in @ref:[fromIterator](fromIterator.md) and File IO in @ref:[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources).
Additional prebuilt technology specific connectors can also be found in the [Alpakka project](https://doc.akka.io/docs/alpakka/current/).
## Examples
Imagine we have a database API which may potentially block both when we initially perform a query and
on retrieving each result from the query. It also gives us an iterator like way to determine if we have reached
the end of the result and a close method that must be called to free resources:
Scala
: @@snip [UnfoldResource.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResource.scala) { #unfoldResource-blocking-api }
Java
: @@snip [UnfoldResource.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResource.java) { #unfoldResource-blocking-api }
## Reactive Streams semantics
Let's see how we use the API above safely through `unfoldResource`:
Scala
: @@snip [UnfoldResource.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResource.scala) { #unfoldResource }
Java
: @@snip [UnfoldResource.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResource.java) { #unfoldResource }
If the resource produces more than one element at a time, combining `unfoldResource` with
@scala[`mapConcat(identity)`]@java[`mapConcat(elems -> elems)`] will give you a stream of individual elements.
See @ref:[mapConcat](../Source-or-Flow/mapConcat.md)) for details.
## Reactive Streams semantics
@@@div { .callout }
**emits** when there is demand and read @scala[function] @java[method] returns value
**emits** when there is demand and the `read` function returns a value
**completes** when read function returns `None`
**completes** when the `read` function returns @scala[`None`]@java[an empty `Optional`]
@@@

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.source;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import java.util.Optional;
interface UnfoldResource {
// imaginary blocking API we need to use
// #unfoldResource-blocking-api
interface Database {
// blocking query
QueryResult doQuery();
}
interface QueryResult {
boolean hasMore();
// potentially blocking retrieval of each element
DatabaseEntry nextEntry();
void close();
}
interface DatabaseEntry {}
// #unfoldResource-blocking-api
default void unfoldResourceExample() {
ActorSystem system = null;
// #unfoldResource
// we don't actually have one, it was just made up for the sample
Database database = null;
Source<DatabaseEntry, NotUsed> queryResultSource =
Source.unfoldResource(
// open
() -> database.doQuery(),
// read
(queryResult) -> {
if (queryResult.hasMore()) return Optional.of(queryResult.nextEntry());
else return Optional.empty();
},
// close
QueryResult::close);
queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system);
// #unfoldResource
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.source
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
object UnfoldResource {
// imaginary blocking API we need to use
// #unfoldResource-blocking-api
trait Database {
// blocking query
def doQuery(): QueryResult
}
trait QueryResult {
def hasMore: Boolean
// potentially blocking retrieval of each element
def nextEntry(): DatabaseEntry
def close(): Unit
}
trait DatabaseEntry
// #unfoldResource-blocking-api
def unfoldResourceExample(): Unit = {
implicit val actorSystem = ActorSystem()
// #unfoldResource
// we don't actually have one, it was just made up for the sample
val database: Database = ???
val queryResultSource: Source[DatabaseEntry, NotUsed] =
Source.unfoldResource[DatabaseEntry, QueryResult](
// open
{ () =>
database.doQuery()
},
// read
{ query =>
if (query.hasMore)
Some(query.nextEntry())
else
// signals end of resource
None
},
// close
query => query.close())
// process each element
queryResultSource.runForeach(println)
// #unfoldResource
}
}