diff --git a/akka-docs/src/main/paradox/stream/operators/Source/unfoldResource.md b/akka-docs/src/main/paradox/stream/operators/Source/unfoldResource.md index 53026811c7..fc2ebd7a6a 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/unfoldResource.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/unfoldResource.md @@ -14,15 +14,53 @@ Wrap any resource that can be opened, queried for next element (in a blocking wa ## Description -Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. +`Source.unfoldResource` allows us to safely extract stream elements from blocking resources by providing it with three functions: + +1. `create`: Open or create the resource +1. `read`: Fetch the next element or signal that we reached the end of the stream by returning a @java[`Optional.empty`]@scala[`None`] +1. `close`: Close the resource, invoked on end of stream or if the stream fails + +The functions are by default called on Akka's dispatcher for blocking IO to avoid interfering with other stream operations. +See @ref:[Blocking Needs Careful Management](../../../typed/dispatchers.md#blocking-needs-careful-management) for an explanation on why this is important. + +Note that there are pre-built `unfoldResource`-like operators to wrap `java.io.InputStream`s in +@ref:[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters), +`Iterator` in @ref:[fromIterator](fromIterator.md) and File IO in @ref:[File IO Sinks and Sources](../index.md#file-io-sinks-and-sources). +Additional prebuilt technology specific connectors can also be found in the [Alpakka project](https://doc.akka.io/docs/alpakka/current/). + +## Examples + +Imagine we have a database API which may potentially block both when we initially perform a query and +on retrieving each result from the query. It also gives us an iterator like way to determine if we have reached +the end of the result and a close method that must be called to free resources: + +Scala +: @@snip [UnfoldResource.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResource.scala) { #unfoldResource-blocking-api } + +Java +: @@snip [UnfoldResource.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResource.java) { #unfoldResource-blocking-api } + +## Reactive Streams semantics + + +Let's see how we use the API above safely through `unfoldResource`: + +Scala +: @@snip [UnfoldResource.scala](/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResource.scala) { #unfoldResource } + +Java +: @@snip [UnfoldResource.java](/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResource.java) { #unfoldResource } + +If the resource produces more than one element at a time, combining `unfoldResource` with +@scala[`mapConcat(identity)`]@java[`mapConcat(elems -> elems)`] will give you a stream of individual elements. +See @ref:[mapConcat](../Source-or-Flow/mapConcat.md)) for details. ## Reactive Streams semantics @@@div { .callout } -**emits** when there is demand and read @scala[function] @java[method] returns value +**emits** when there is demand and the `read` function returns a value -**completes** when read function returns `None` +**completes** when the `read` function returns @scala[`None`]@java[an empty `Optional`] @@@ - diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResource.java b/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResource.java new file mode 100644 index 0000000000..5c616aa00a --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/UnfoldResource.java @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.javadsl.Source; + +import java.util.Optional; + +interface UnfoldResource { + // imaginary blocking API we need to use + // #unfoldResource-blocking-api + interface Database { + // blocking query + QueryResult doQuery(); + } + + interface QueryResult { + boolean hasMore(); + // potentially blocking retrieval of each element + DatabaseEntry nextEntry(); + + void close(); + } + + interface DatabaseEntry {} + + // #unfoldResource-blocking-api + + default void unfoldResourceExample() { + ActorSystem system = null; + + // #unfoldResource + // we don't actually have one, it was just made up for the sample + Database database = null; + + Source queryResultSource = + Source.unfoldResource( + // open + () -> database.doQuery(), + // read + (queryResult) -> { + if (queryResult.hasMore()) return Optional.of(queryResult.nextEntry()); + else return Optional.empty(); + }, + // close + QueryResult::close); + + queryResultSource.runForeach(entry -> System.out.println(entry.toString()), system); + // #unfoldResource + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResource.scala b/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResource.scala new file mode 100644 index 0000000000..22d3eb1c12 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/UnfoldResource.scala @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.stream.operators.source + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +object UnfoldResource { + + // imaginary blocking API we need to use + // #unfoldResource-blocking-api + trait Database { + // blocking query + def doQuery(): QueryResult + } + trait QueryResult { + def hasMore: Boolean + // potentially blocking retrieval of each element + def nextEntry(): DatabaseEntry + def close(): Unit + } + trait DatabaseEntry + // #unfoldResource-blocking-api + + def unfoldResourceExample(): Unit = { + implicit val actorSystem = ActorSystem() + // #unfoldResource + // we don't actually have one, it was just made up for the sample + val database: Database = ??? + + val queryResultSource: Source[DatabaseEntry, NotUsed] = + Source.unfoldResource[DatabaseEntry, QueryResult]( + // open + { () => + database.doQuery() + }, + // read + { query => + if (query.hasMore) + Some(query.nextEntry()) + else + // signals end of resource + None + }, + // close + query => query.close()) + + // process each element + queryResultSource.runForeach(println) + // #unfoldResource + } + +}