parent
a217d5566e
commit
1a8a438144
6 changed files with 93 additions and 22 deletions
|
|
@ -1,22 +1,51 @@
|
|||
# maybe
|
||||
|
||||
Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`] will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away.
|
||||
Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.
|
||||
|
||||
@ref[Source operators](../index.md#source-operators)
|
||||
|
||||
@@@div { .group-scala }
|
||||
|
||||
## Signature
|
||||
|
||||
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #maybe }
|
||||
Scala
|
||||
: @@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #maybe }
|
||||
|
||||
@@@
|
||||
Java
|
||||
: @@snip [SourceDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #maybe-signature }
|
||||
|
||||
## Description
|
||||
|
||||
Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`]
|
||||
will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away.
|
||||
Create a source with a materialized @scala[`Promise[Option[T]]`] @java[`CompletableFuture<Optional<T>>`] which
|
||||
controls what element will be emitted by the Source. This makes it possible to inject a value into a stream
|
||||
after creation.
|
||||
|
||||
* If the materialized promise is completed with a @scala[`Some`]@java[non-empty `Optional`],
|
||||
that value will be produced downstream, followed by completion.
|
||||
* If the materialized promise is completed with a @scala[`None`]@java[empty `Optional`],
|
||||
no value will be produced downstream and completion will be signalled immediately.
|
||||
* If the materialized promise is completed with a failure, then the source will fail with that error.
|
||||
* If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed
|
||||
with @scala[`None`]@java[empty `Optional`].
|
||||
|
||||
`Source.maybe` has some similarities with @scala[@ref:[`Source.fromFuture`](fromFuture.md)]@java[@ref:[`Source.fromCompletionStage`](fromCompletionStage.md)].
|
||||
One difference is that a new @scala[`Promise`]@java[`CompletableFuture`] is materialized from `Source.maybe` each time
|
||||
the stream is run while the @scala[`Future`]@java[`CompletionStage`] given to
|
||||
@scala[`Source.fromFuture`]@java[`Source.fromCompletionStage`] can only be completed once.
|
||||
|
||||
@ref:[`Source.queue`](queue.md) is an alternative for emitting more than one element.
|
||||
|
||||
## Example
|
||||
|
||||
Scala
|
||||
: @@snip [SourceOperators.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #maybe }
|
||||
|
||||
Java
|
||||
: @@snip [SourceDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #maybe }
|
||||
|
||||
The `Source.maybe[Int]` will return a @scala[`Promise[Option[Int]]`]@java[`CompletableFuture<Optional<Integer>>`]
|
||||
materialized value. That @scala[`Promise`]@java[`CompletableFuture`] can be completed later. Each time the stream
|
||||
is run a new @scala[`Promise`]@java[`CompletableFuture`] is returned.
|
||||
|
||||
## Reactive Streams semantics
|
||||
|
||||
@@@div { .callout }
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|
|||
|Source|<a name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.|
|
||||
|Source|<a name="lazily"></a>@ref[lazily](Source/lazily.md)|Defers creation and materialization of a `Source` until there is demand.|
|
||||
|Source|<a name="lazilyasync"></a>@ref[lazilyAsync](Source/lazilyAsync.md)|Defers creation and materialization of a `CompletionStage` until there is demand.|
|
||||
|Source|<a name="maybe"></a>@ref[maybe](Source/maybe.md)|Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`] will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away.|
|
||||
|Source|<a name="maybe"></a>@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.|
|
||||
|Source|<a name="queue"></a>@ref[queue](Source/queue.md)|Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. |
|
||||
|Source|<a name="range"></a>@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.|
|
||||
|Source|<a name="repeat"></a>@ref[repeat](Source/repeat.md)|Stream a single object repeatedly|
|
||||
|
|
|
|||
|
|
@ -22,6 +22,11 @@ import akka.stream.javadsl.Sink;
|
|||
import akka.testkit.TestProbe;
|
||||
// #actor-ref-imports
|
||||
|
||||
// #maybe
|
||||
import akka.stream.javadsl.RunnableGraph;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
// #maybe
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
|
||||
|
|
@ -32,9 +37,9 @@ public class SourceDocExamples {
|
|||
public static final TestKitJunitResource testKit = new TestKitJunitResource(ManualTime.config());
|
||||
|
||||
public static void fromExample() {
|
||||
// #source-from-example
|
||||
final ActorSystem system = ActorSystem.create("SourceFromExample");
|
||||
final ActorSystem system = null;
|
||||
|
||||
// #source-from-example
|
||||
Source<Integer, NotUsed> ints = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5));
|
||||
ints.runForeach(System.out::println, system);
|
||||
|
||||
|
|
@ -71,9 +76,9 @@ public class SourceDocExamples {
|
|||
}
|
||||
|
||||
static void actorRef() {
|
||||
// #actor-ref
|
||||
final ActorSystem system = null;
|
||||
|
||||
final ActorSystem system = ActorSystem.create();
|
||||
// #actor-ref
|
||||
|
||||
int bufferSize = 100;
|
||||
Source<Object, ActorRef> source = Source.actorRef(bufferSize, OverflowStrategy.dropHead());
|
||||
|
|
@ -89,10 +94,9 @@ public class SourceDocExamples {
|
|||
|
||||
static void actorRefWithBackpressure() {
|
||||
final TestProbe probe = null;
|
||||
final ActorSystem system = null;
|
||||
|
||||
// #actorRefWithBackpressure
|
||||
final ActorSystem system = ActorSystem.create();
|
||||
|
||||
Source<Object, ActorRef> source =
|
||||
Source.actorRefWithBackpressure(
|
||||
"ack",
|
||||
|
|
@ -112,4 +116,28 @@ public class SourceDocExamples {
|
|||
actorRef.tell("complete", ActorRef.noSender());
|
||||
// #actorRefWithBackpressure
|
||||
}
|
||||
|
||||
static void maybeExample() {
|
||||
final ActorSystem system = null;
|
||||
|
||||
// #maybe
|
||||
Source<Integer, CompletableFuture<Optional<Integer>>> source = Source.<Integer>maybe();
|
||||
RunnableGraph<CompletableFuture<Optional<Integer>>> runnable =
|
||||
source.to(Sink.foreach(System.out::println));
|
||||
|
||||
CompletableFuture<Optional<Integer>> completable1 = runnable.run(system);
|
||||
completable1.complete(Optional.of(1)); // prints 1
|
||||
|
||||
CompletableFuture<Optional<Integer>> completable2 = runnable.run(system);
|
||||
completable2.complete(Optional.of(2)); // prints 2
|
||||
// #maybe
|
||||
}
|
||||
|
||||
static
|
||||
// #maybe-signature
|
||||
<Out> Source<Out, CompletableFuture<Optional<Out>>> maybe()
|
||||
// #maybe-signature
|
||||
{
|
||||
return Source.maybe();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import akka.testkit.TestProbe
|
|||
|
||||
object SourceOperators {
|
||||
|
||||
implicit val system: ActorSystem = ???
|
||||
|
||||
def fromFuture = {
|
||||
//#sourceFromFuture
|
||||
|
||||
|
|
@ -18,8 +20,6 @@ object SourceOperators {
|
|||
|
||||
import scala.concurrent.Future
|
||||
|
||||
implicit val system: ActorSystem = ActorSystem()
|
||||
|
||||
val source: Source[Int, NotUsed] = Source.fromFuture(Future.successful(10))
|
||||
val sink: Sink[Int, Future[Done]] = Sink.foreach((i: Int) => println(i))
|
||||
|
||||
|
|
@ -36,7 +36,6 @@ object SourceOperators {
|
|||
import akka.stream.CompletionStrategy
|
||||
import akka.stream.scaladsl._
|
||||
|
||||
implicit val system: ActorSystem = ActorSystem()
|
||||
val bufferSize = 100
|
||||
|
||||
val source: Source[Any, ActorRef] = Source.actorRef[Any](bufferSize, OverflowStrategy.dropHead)
|
||||
|
|
@ -58,7 +57,6 @@ object SourceOperators {
|
|||
import akka.stream.CompletionStrategy
|
||||
import akka.stream.scaladsl._
|
||||
|
||||
implicit val system: ActorSystem = ActorSystem()
|
||||
val probe = TestProbe()
|
||||
|
||||
val source: Source[Any, ActorRef] = Source.actorRefWithBackpressure[Any]("ack", {
|
||||
|
|
@ -75,4 +73,20 @@ object SourceOperators {
|
|||
actorRef ! Success(())
|
||||
//#actorRefWithBackpressure
|
||||
}
|
||||
|
||||
def maybe(): Unit = {
|
||||
//#maybe
|
||||
import akka.stream.scaladsl._
|
||||
import scala.concurrent.Promise
|
||||
|
||||
val source = Source.maybe[Int].to(Sink.foreach(elem => println(elem)))
|
||||
|
||||
val promise1: Promise[Option[Int]] = source.run()
|
||||
promise1.success(Some(1)) // prints 1
|
||||
|
||||
// a new Promise is returned when the stream is materialized
|
||||
val promise2 = source.run()
|
||||
promise2.success(Some(2)) // prints 2
|
||||
//#maybe
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,8 +51,8 @@ object Source {
|
|||
* followed by completion.
|
||||
* If the materialized promise is completed with an empty Optional, no value will be produced downstream and completion will
|
||||
* be signalled immediately.
|
||||
* If the materialized promise is completed with a failure, then the returned source will terminate with that error.
|
||||
* If the downstream of this source cancels before the promise has been completed, then the promise will be completed
|
||||
* If the materialized promise is completed with a failure, then the source will fail with that error.
|
||||
* If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed
|
||||
* with an empty Optional.
|
||||
*/
|
||||
def maybe[T]: Source[T, CompletableFuture[Optional[T]]] = {
|
||||
|
|
|
|||
|
|
@ -444,8 +444,8 @@ object Source {
|
|||
* followed by completion.
|
||||
* If the materialized promise is completed with a None, no value will be produced downstream and completion will
|
||||
* be signalled immediately.
|
||||
* If the materialized promise is completed with a failure, then the returned source will terminate with that error.
|
||||
* If the downstream of this source cancels before the promise has been completed, then the promise will be completed
|
||||
* If the materialized promise is completed with a failure, then the source will fail with that error.
|
||||
* If the downstream of this source cancels or fails before the promise has been completed, then the promise will be completed
|
||||
* with None.
|
||||
*/
|
||||
def maybe[T]: Source[T, Promise[Option[T]]] =
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue