From 1a8a438144d89295192c42eeaad0cfdddf89f2bd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 3 Oct 2019 14:09:45 +0200 Subject: [PATCH] doc: Source.maybe, #25468 (#27832) --- .../paradox/stream/operators/Source/maybe.md | 43 ++++++++++++++++--- .../main/paradox/stream/operators/index.md | 2 +- .../stream/operators/SourceDocExamples.java | 40 ++++++++++++++--- .../stream/operators/SourceOperators.scala | 22 ++++++++-- .../scala/akka/stream/javadsl/Source.scala | 4 +- .../scala/akka/stream/scaladsl/Source.scala | 4 +- 6 files changed, 93 insertions(+), 22 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source/maybe.md b/akka-docs/src/main/paradox/stream/operators/Source/maybe.md index c077a2f1be..af8e4e52ed 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/maybe.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/maybe.md @@ -1,22 +1,51 @@ # maybe -Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`] will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away. +Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value. @ref[Source operators](../index.md#source-operators) -@@@div { .group-scala } - ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #maybe } +Scala + : @@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #maybe } -@@@ +Java +: @@snip [SourceDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #maybe-signature } ## Description -Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`] -will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away. +Create a source with a materialized @scala[`Promise[Option[T]]`] @java[`CompletableFuture>`] which +controls what element will be emitted by the Source. This makes it possible to inject a value into a stream +after creation. +* If the materialized promise is completed with a @scala[`Some`]@java[non-empty `Optional`], + that value will be produced downstream, followed by completion. +* If the materialized promise is completed with a @scala[`None`]@java[empty `Optional`], + no value will be produced downstream and completion will be signalled immediately. +* If the materialized promise is completed with a failure, then the source will fail with that error. +* If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed + with @scala[`None`]@java[empty `Optional`]. + +`Source.maybe` has some similarities with @scala[@ref:[`Source.fromFuture`](fromFuture.md)]@java[@ref:[`Source.fromCompletionStage`](fromCompletionStage.md)]. +One difference is that a new @scala[`Promise`]@java[`CompletableFuture`] is materialized from `Source.maybe` each time +the stream is run while the @scala[`Future`]@java[`CompletionStage`] given to +@scala[`Source.fromFuture`]@java[`Source.fromCompletionStage`] can only be completed once. + +@ref:[`Source.queue`](queue.md) is an alternative for emitting more than one element. + +## Example + +Scala +: @@snip [SourceOperators.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #maybe } + +Java +: @@snip [SourceDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #maybe } + +The `Source.maybe[Int]` will return a @scala[`Promise[Option[Int]]`]@java[`CompletableFuture>`] +materialized value. That @scala[`Promise`]@java[`CompletableFuture`] can be completed later. Each time the stream +is run a new @scala[`Promise`]@java[`CompletableFuture`] is returned. + +## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index d4e74ab6d4..5fc885d36e 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -25,7 +25,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.| |Source|@ref[lazily](Source/lazily.md)|Defers creation and materialization of a `Source` until there is demand.| |Source|@ref[lazilyAsync](Source/lazilyAsync.md)|Defers creation and materialization of a `CompletionStage` until there is demand.| -|Source|@ref[maybe](Source/maybe.md)|Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`] will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away.| +|Source|@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.| |Source|@ref[queue](Source/queue.md)|Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. | |Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.| |Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly| diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java index 82be3e0958..4467e0a82b 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -22,6 +22,11 @@ import akka.stream.javadsl.Sink; import akka.testkit.TestProbe; // #actor-ref-imports +// #maybe +import akka.stream.javadsl.RunnableGraph; +import java.util.concurrent.CompletableFuture; +// #maybe + import java.util.Arrays; import java.util.Optional; @@ -32,9 +37,9 @@ public class SourceDocExamples { public static final TestKitJunitResource testKit = new TestKitJunitResource(ManualTime.config()); public static void fromExample() { - // #source-from-example - final ActorSystem system = ActorSystem.create("SourceFromExample"); + final ActorSystem system = null; + // #source-from-example Source ints = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5)); ints.runForeach(System.out::println, system); @@ -71,9 +76,9 @@ public class SourceDocExamples { } static void actorRef() { - // #actor-ref + final ActorSystem system = null; - final ActorSystem system = ActorSystem.create(); + // #actor-ref int bufferSize = 100; Source source = Source.actorRef(bufferSize, OverflowStrategy.dropHead()); @@ -89,10 +94,9 @@ public class SourceDocExamples { static void actorRefWithBackpressure() { final TestProbe probe = null; + final ActorSystem system = null; // #actorRefWithBackpressure - final ActorSystem system = ActorSystem.create(); - Source source = Source.actorRefWithBackpressure( "ack", @@ -112,4 +116,28 @@ public class SourceDocExamples { actorRef.tell("complete", ActorRef.noSender()); // #actorRefWithBackpressure } + + static void maybeExample() { + final ActorSystem system = null; + + // #maybe + Source>> source = Source.maybe(); + RunnableGraph>> runnable = + source.to(Sink.foreach(System.out::println)); + + CompletableFuture> completable1 = runnable.run(system); + completable1.complete(Optional.of(1)); // prints 1 + + CompletableFuture> completable2 = runnable.run(system); + completable2.complete(Optional.of(2)); // prints 2 + // #maybe + } + + static + // #maybe-signature + Source>> maybe() + // #maybe-signature + { + return Source.maybe(); + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala index 15036c486e..a70facc182 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala @@ -9,6 +9,8 @@ import akka.testkit.TestProbe object SourceOperators { + implicit val system: ActorSystem = ??? + def fromFuture = { //#sourceFromFuture @@ -18,8 +20,6 @@ object SourceOperators { import scala.concurrent.Future - implicit val system: ActorSystem = ActorSystem() - val source: Source[Int, NotUsed] = Source.fromFuture(Future.successful(10)) val sink: Sink[Int, Future[Done]] = Sink.foreach((i: Int) => println(i)) @@ -36,7 +36,6 @@ object SourceOperators { import akka.stream.CompletionStrategy import akka.stream.scaladsl._ - implicit val system: ActorSystem = ActorSystem() val bufferSize = 100 val source: Source[Any, ActorRef] = Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead) @@ -58,7 +57,6 @@ object SourceOperators { import akka.stream.CompletionStrategy import akka.stream.scaladsl._ - implicit val system: ActorSystem = ActorSystem() val probe = TestProbe() val source: Source[Any, ActorRef] = Source.actorRefWithBackpressure[Any]("ack", { @@ -75,4 +73,20 @@ object SourceOperators { actorRef ! Success(()) //#actorRefWithBackpressure } + + def maybe(): Unit = { + //#maybe + import akka.stream.scaladsl._ + import scala.concurrent.Promise + + val source = Source.maybe[Int].to(Sink.foreach(elem => println(elem))) + + val promise1: Promise[Option[Int]] = source.run() + promise1.success(Some(1)) // prints 1 + + // a new Promise is returned when the stream is materialized + val promise2 = source.run() + promise2.success(Some(2)) // prints 2 + //#maybe + } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 5cf82b6fce..b7a20787ab 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -51,8 +51,8 @@ object Source { * followed by completion. * If the materialized promise is completed with an empty Optional, no value will be produced downstream and completion will * be signalled immediately. - * If the materialized promise is completed with a failure, then the returned source will terminate with that error. - * If the downstream of this source cancels before the promise has been completed, then the promise will be completed + * If the materialized promise is completed with a failure, then the source will fail with that error. + * If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed * with an empty Optional. */ def maybe[T]: Source[T, CompletableFuture[Optional[T]]] = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5ec4952810..a721a046a0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -444,8 +444,8 @@ object Source { * followed by completion. * If the materialized promise is completed with a None, no value will be produced downstream and completion will * be signalled immediately. - * If the materialized promise is completed with a failure, then the returned source will terminate with that error. - * If the downstream of this source cancels before the promise has been completed, then the promise will be completed + * If the materialized promise is completed with a failure, then the source will fail with that error. + * If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed * with None. */ def maybe[T]: Source[T, Promise[Option[T]]] =