Adds example to Source.completionStageSource (#28639)

This commit is contained in:
Ignasi Marimon-Clos 2020-02-27 17:59:01 +01:00 committed by GitHub
parent 26c333b52c
commit 8ba9fda183
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 11 deletions

View file

@ -4,7 +4,7 @@ Send the single value of the `CompletionStage` when it completes and there is de
@ref[Source operators](../index.md#source-operators)
@@@div { .group-scala }
@@@div { .group-java }
## Signature
@ -20,6 +20,11 @@ If the `CompletionStage` fails the stream is failed with that exception.
For the corresponding operator for the Scala standard library `Future` see @ref:[future](future.md).
## Example
Java
: @@snip [SourceFromCompletionStage.java](/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java) { #sourceFromCompletionStage }
## Reactive Streams semantics
@@@div { .callout }
@ -29,10 +34,3 @@ For the corresponding operator for the Scala standard library `Future` see @ref:
**completes** after the future has completed
@@@
## Example
Java
: @@snip [SourceFromCompletionStage.java](/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java) { #sourceFromCompletionStage }
For the corresponding operator for the Scala standard library `Future` see @ref:[future](future.md).

View file

@ -4,13 +4,29 @@ Streams the elements of an asynchronous source once its given *completion* opera
@ref[Source operators](../index.md#source-operators)
@@@ div { .group-java }
## Signature
@apidoc[Source.completionStageSource](Source$) { java="#completionStageSource(java.util.concurrent.CompletionStage)" }
@@@
## Description
Streams the elements of an asynchronous source once its given *completion* operator completes.
If the *completion* fails the stream is failed with that exception.
For the corresponding operator for the Scala standard library `Future` see @ref:[futureSource](futureSource.md).
## Example
Suppose we are accessing a remote service that streams user data over HTTP/2 or a WebSocket. We can model that
as a @apidoc[Source[User,NotUsed]] but that source will only be available once the connection has been established.
Java
: @@snip [CompletionStageSource.java](/akka-docs/src/test/java/jdocs/stream/operators/source/CompletionStageSource.java) { #sourceCompletionStageSource }
## Reactive Streams semantics
@@@div { .callout }
@ -20,4 +36,3 @@ If the *completion* fails the stream is failed with that exception.
**completes** after the asynchronous source completes
@@@

View file

@ -17,6 +17,16 @@ Streams the elements of the given future source once it successfully completes.
Streams the elements of the given future source once it successfully completes.
If the future fails the stream is failed.
For the corresponding operator for the Java standard library `CompletionStage` see @ref:[completionStageSource](completionStageSource.md).
## Example
Suppose we are accessing a remote service that streams user data over HTTP/2 or a WebSocket. We can model that
as a @apidoc[Source[User,NotUsed]] but that source will only be available once the connection has been established.
Scala
: @@snip [FutureSource.scala](/akka-docs/src/test/scala/docs/stream/operators/source/FutureSource.scala) { #sourceFutureSource }
## Reactive Streams semantics
@@@div { .callout }
@ -26,4 +36,3 @@ If the future fails the stream is failed.
**completes** after the *future* source completes
@@@

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.source;
// #sourceCompletionStageSource
import akka.NotUsed;
import akka.stream.javadsl.Source;
import java.util.concurrent.CompletionStage;
public class CompletionStageSource {
public static void sourceCompletionStageSource() {
UserRepository userRepository = null; // an abstraction over the remote service
Source<User, CompletionStage<NotUsed>> userCompletionStageSource =
Source.completionStageSource(userRepository.loadUsers());
// ...
}
interface UserRepository {
CompletionStage<Source<User, NotUsed>> loadUsers();
}
static class User {}
}
// #sourceCompletionStageSource

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.source
// #sourceFutureSource
import akka.NotUsed
import akka.stream.scaladsl.Source
import scala.concurrent.Future
object FutureSource {
def sourceCompletionStageSource(): Unit = {
val userRepository: UserRepository = ??? // an abstraction over the remote service
val userFutureSource = Source.futureSource(userRepository.loadUsers)
// ...
}
trait UserRepository {
def loadUsers: Future[Source[User, NotUsed]]
}
case class User()
}
// #sourceFutureSource