From 8ba9fda183976fe5f21e4f2ccaab464f90017825 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Thu, 27 Feb 2020 17:59:01 +0100 Subject: [PATCH] Adds example to Source.completionStageSource (#28639) --- .../operators/Source/completionStage.md | 14 ++++------ .../operators/Source/completionStageSource.md | 19 +++++++++++-- .../stream/operators/Source/futureSource.md | 11 +++++++- .../source/CompletionStageSource.java | 28 +++++++++++++++++++ .../operators/source/FutureSource.scala | 28 +++++++++++++++++++ 5 files changed, 89 insertions(+), 11 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/source/CompletionStageSource.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/source/FutureSource.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md b/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md index c95230ffe7..f9af46feeb 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md @@ -4,7 +4,7 @@ Send the single value of the `CompletionStage` when it completes and there is de @ref[Source operators](../index.md#source-operators) -@@@div { .group-scala } +@@@div { .group-java } ## Signature @@ -20,6 +20,11 @@ If the `CompletionStage` fails the stream is failed with that exception. For the corresponding operator for the Scala standard library `Future` see @ref:[future](future.md). +## Example + +Java +: @@snip [SourceFromCompletionStage.java](/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java) { #sourceFromCompletionStage } + ## Reactive Streams semantics @@@div { .callout } @@ -29,10 +34,3 @@ For the corresponding operator for the Scala standard library `Future` see @ref: **completes** after the future has completed @@@ - -## Example - -Java -: @@snip [SourceFromCompletionStage.java](/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java) { #sourceFromCompletionStage } - -For the corresponding operator for the Scala standard library `Future` see @ref:[future](future.md). diff --git a/akka-docs/src/main/paradox/stream/operators/Source/completionStageSource.md b/akka-docs/src/main/paradox/stream/operators/Source/completionStageSource.md index 4bdd85c46d..a7a9edffac 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/completionStageSource.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/completionStageSource.md @@ -4,13 +4,29 @@ Streams the elements of an asynchronous source once its given *completion* opera @ref[Source operators](../index.md#source-operators) +@@@ div { .group-java } + ## Signature +@apidoc[Source.completionStageSource](Source$) { java="#completionStageSource(java.util.concurrent.CompletionStage)" } + +@@@ + ## Description Streams the elements of an asynchronous source once its given *completion* operator completes. If the *completion* fails the stream is failed with that exception. +For the corresponding operator for the Scala standard library `Future` see @ref:[futureSource](futureSource.md). + +## Example + +Suppose we are accessing a remote service that streams user data over HTTP/2 or a WebSocket. We can model that +as a @apidoc[Source[User,NotUsed]] but that source will only be available once the connection has been established. + +Java +: @@snip [CompletionStageSource.java](/akka-docs/src/test/java/jdocs/stream/operators/source/CompletionStageSource.java) { #sourceCompletionStageSource } + ## Reactive Streams semantics @@@div { .callout } @@ -19,5 +35,4 @@ If the *completion* fails the stream is failed with that exception. **completes** after the asynchronous source completes -@@@ - +@@@ \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source/futureSource.md b/akka-docs/src/main/paradox/stream/operators/Source/futureSource.md index 389f2e86a3..83bdc4abc5 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/futureSource.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/futureSource.md @@ -17,6 +17,16 @@ Streams the elements of the given future source once it successfully completes. Streams the elements of the given future source once it successfully completes. If the future fails the stream is failed. +For the corresponding operator for the Java standard library `CompletionStage` see @ref:[completionStageSource](completionStageSource.md). + +## Example + +Suppose we are accessing a remote service that streams user data over HTTP/2 or a WebSocket. We can model that +as a @apidoc[Source[User,NotUsed]] but that source will only be available once the connection has been established. + +Scala +: @@snip [FutureSource.scala](/akka-docs/src/test/scala/docs/stream/operators/source/FutureSource.scala) { #sourceFutureSource } + ## Reactive Streams semantics @@@div { .callout } @@ -26,4 +36,3 @@ If the future fails the stream is failed. **completes** after the *future* source completes @@@ - diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/CompletionStageSource.java b/akka-docs/src/test/java/jdocs/stream/operators/source/CompletionStageSource.java new file mode 100644 index 0000000000..70ec066c74 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/CompletionStageSource.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +// #sourceCompletionStageSource +import akka.NotUsed; +import akka.stream.javadsl.Source; + +import java.util.concurrent.CompletionStage; + +public class CompletionStageSource { + + public static void sourceCompletionStageSource() { + UserRepository userRepository = null; // an abstraction over the remote service + Source> userCompletionStageSource = + Source.completionStageSource(userRepository.loadUsers()); + // ... + } + + interface UserRepository { + CompletionStage> loadUsers(); + } + + static class User {} +} +// #sourceCompletionStageSource diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/FutureSource.scala b/akka-docs/src/test/scala/docs/stream/operators/source/FutureSource.scala new file mode 100644 index 0000000000..1f944c1990 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/source/FutureSource.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.source + +// #sourceFutureSource + +import akka.NotUsed +import akka.stream.scaladsl.Source + +import scala.concurrent.Future + +object FutureSource { + def sourceCompletionStageSource(): Unit = { + val userRepository: UserRepository = ??? // an abstraction over the remote service + val userFutureSource = Source.futureSource(userRepository.loadUsers) + // ... + } + + trait UserRepository { + def loadUsers: Future[Source[User, NotUsed]] + } + + case class User() +} + +// #sourceFutureSource