From 74adecb4e7025a82f2f76adebf9499b60ca1624c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 16 Oct 2019 17:02:12 +0200 Subject: [PATCH] Align lazy and future operators #26446 --- .../project/migration-guide-2.5.x-2.6.x.md | 75 ++++ .../operators/Flow/completionStageFlow.md | 42 +++ .../stream/operators/Flow/futureFlow.md | 43 +++ .../operators/Flow/lazyCompletionStageFlow.md | 35 ++ .../paradox/stream/operators/Flow/lazyFlow.md | 36 ++ .../stream/operators/Flow/lazyFutureFlow.md | 42 +++ .../stream/operators/Flow/lazyInitAsync.md | 14 +- .../operators/Sink/completionStageSink.md | 23 ++ .../stream/operators/Sink/futureSink.md | 30 ++ .../operators/Sink/lazyCompletionStageSink.md | 31 ++ .../stream/operators/Sink/lazyFutureSink.md | 38 ++ .../stream/operators/Sink/lazyInitAsync.md | 4 +- .../paradox/stream/operators/Sink/lazySink.md | 37 ++ .../operators/Source/completionStage.md | 32 ++ .../operators/Source/completionStageSource.md | 23 ++ .../operators/Source/fromCompletionStage.md | 4 +- .../stream/operators/Source/fromFuture.md | 6 +- .../operators/Source/fromFutureSource.md | 4 +- .../Source/fromSourceCompletionStage.md | 4 +- .../paradox/stream/operators/Source/future.md | 35 ++ .../stream/operators/Source/futureSource.md | 29 ++ .../paradox/stream/operators/Source/lazily.md | 4 +- .../stream/operators/Source/lazilyAsync.md | 4 +- .../operators/Source/lazyCompletionStage.md | 25 ++ .../Source/lazyCompletionStageSource.md | 27 ++ .../stream/operators/Source/lazyFuture.md | 33 ++ .../operators/Source/lazyFutureSource.md | 34 ++ .../stream/operators/Source/lazySingle.md | 32 ++ .../stream/operators/Source/lazySource.md | 35 ++ .../main/paradox/stream/operators/index.md | 56 ++- .../stream/operators/flow/FutureFlow.java | 44 +++ .../stream/operators/SourceOperators.scala | 2 +- .../stream/operators/flow/FutureFlow.scala | 35 ++ .../artery/tcp/ArteryTcpTransport.scala | 8 +- .../akka/stream/tck/FuturePublisherTest.scala | 2 +- .../stream/javadsl/LazyAndFutureFlowTest.java | 67 ++++ .../javadsl/LazyAndFutureSourcesTest.java | 106 ++++++ .../stream/DslFactoriesConsistencySpec.scala | 11 +- .../akka/stream/scaladsl/FlowConcatSpec.scala | 2 +- .../scaladsl/FlowFlattenMergeSpec.scala | 6 +- .../stream/scaladsl/FlowFromFutureSpec.scala | 2 + .../scaladsl/FutureFlattenSourceSpec.scala | 206 ----------- .../stream/scaladsl/GraphConcatSpec.scala | 2 +- .../stream/scaladsl/LazilyAsyncSpec.scala | 2 + .../akka/stream/scaladsl/LazyFlowSpec.scala | 209 +++++++++-- .../akka/stream/scaladsl/LazySinkSpec.scala | 2 + .../akka/stream/scaladsl/LazySourceSpec.scala | 325 ++++++++++++++++-- .../snapshot/MaterializerStateSpec.scala | 2 +- .../stream/NeverMaterializedException.scala | 12 + .../scala/akka/stream/impl/LazySource.scala | 13 +- .../main/scala/akka/stream/impl/Sinks.scala | 10 +- .../akka/stream/impl/fusing/GraphStages.scala | 17 +- .../scala/akka/stream/impl/fusing/Ops.scala | 110 +++--- .../main/scala/akka/stream/javadsl/Flow.scala | 65 +++- .../main/scala/akka/stream/javadsl/Sink.scala | 44 ++- .../scala/akka/stream/javadsl/Source.scala | 119 ++++++- .../scala/akka/stream/scaladsl/Flow.scala | 70 +++- .../scala/akka/stream/scaladsl/Sink.scala | 48 ++- .../scala/akka/stream/scaladsl/Source.scala | 97 +++++- 59 files changed, 2091 insertions(+), 384 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/Flow/completionStageFlow.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Flow/futureFlow.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Flow/lazyCompletionStageFlow.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Flow/lazyFutureFlow.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/completionStageSink.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/futureSink.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/lazySink.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/completionStage.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/completionStageSource.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/future.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/futureSource.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/lazyCompletionStage.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/lazyCompletionStageSource.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/lazyFuture.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/lazyFutureSource.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/lazySingle.md create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/lazySource.md create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/flow/FutureFlow.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/flow/FutureFlow.scala create mode 100644 akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureFlowTest.java create mode 100644 akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureSourcesTest.java delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/NeverMaterializedException.scala diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 70bb1b12ea..b89c4276f0 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -717,3 +717,78 @@ This also means that custom `GraphStage` implementations should be changed to pa cancellation cause when downstream cancels by implementing the `OutHandler.onDownstreamFinish` signature taking a `cause` parameter and calling `cancelStage(cause)` to pass the cause upstream. The old zero-argument `onDownstreamFinish` method has been deprecated. + + +### Lazy and async stream operator changes + +The operators that provide support for lazy and @scala[`Future`]@java[`CompletionStage`] stream construction were revised +to be more consistent. + +The materialized value is now no longer wrapped in an @scala[`Option`]@java[`Optional`], instead the @scala[`Future`]@java[`CompletionStage`] +is failed with a `akka.stream.NeverMaterializedException` in the cases that would previously lead to @scala[`None`]@java[an empty `Optional`] + +A deferred creation of the stream based on the initial element like how the deprecated `lazyInit` worked can be achieved by combining +@scala[`future(Flow|Sink)`] @java[`completionStage(Flow|Sink)`] with `prefixAndTail`. See example in @scala[@ref:[futureFlow](../stream/operators/Flow/futureFlow.md)] +@java[@ref:[completionStageFlow](../stream/operators/Flow/completionStageFlow.md)]. + +#### javadsl.Flow + +| old | new | +------------------------|---------------- +| lazyInit | @ref:[lazyCompletionStageFlow](../stream/operators/Flow/lazyCompletionStageFlow.md) in combination with `prefixAndTail(1)` | +| lazyInitAsync | @ref:[lazyCompletionStageFlow](../stream/operators/Flow/lazyCompletionStageFlow.md) | +| | @ref:[completionStageFlow](../stream/operators/Flow/completionStageFlow.md) | +| | @ref:[lazyFlow](../stream/operators/Flow/lazyFlow.md) | + +### javadsl.Sink + +| old | new | +------------------------|---------------- +| lazyInit | @ref:[lazyCompletionStageSink](../stream/operators/Sink/lazyCompletionStageSink.md) in combination with `Flow.prefixAndTail(1)` | +| lazyInitAsync | @ref:[lazyCompletionStageSink](../stream/operators/Sink/lazyCompletionStageSink.md) | +| | @ref:[completionStageSink](../stream/operators/Sink/completionStageSink.md) | +| | @ref:[lazySink](../stream/operators/Sink/lazySink.md) | + +### javadsl.Source + +| old | new | +--------------------------|---------------- +| fromFuture | @ref:[future](../stream/operators/Source/future.md) | +| fromCompletionStage | @ref:[completionStage](../stream/operators/Source/completionStage.md) | +| fromFutureSource | @ref:[futureSource](../stream/operators/Source/futureSource.md) | +| fromSourceCompletionStage | @ref:[completionStageSource](../stream/operators/Source/completionStageSource.md) | +| lazily | @ref:[lazySource](../stream/operators/Source/lazySource.md) | +| lazilyAsync | @ref:[lazyCompletionStage](../stream/operators/Source/lazyCompletionStage.md) | +| | @ref:[lazySingle](../stream/operators/Source/lazySingle.md) | +| | @ref:[lazyCompletionStageSource](../stream/operators/Source/lazyCompletionStageSource.md) | + +### scaladsl.Flow + +| old | new | +--------------------------|---------------- +| lazyInit | @ref:[lazyFutureFlow](../stream/operators/Flow/lazyFutureFlow.md) | +| lazyInitAsync | @ref:[lazyFutureFlow](../stream/operators/Flow/lazyFutureFlow.md) | +| | @ref:[futureFlow](../stream/operators/Flow/futureFlow.md) | +| | @ref:[lazyFlow](../stream/operators/Flow/lazyFlow.md) | + +### scaladsl.Sink + +| old | new | +------------------------|---------------- +| lazyInit | @ref:[lazyFutureSink](../stream/operators/Sink/lazyFutureSink.md) in combination with `Flow.prefixAndTail(1)` | +| lazyInitAsync | @ref:[lazyFutureSink](../stream/operators/Sink/lazyFutureSink.md) | +| | @ref:[futureSink](../stream/operators/Sink/futureSink.md) | +| | @ref:[lazySink](../stream/operators/Sink/lazySink.md) | + +### scaladsl.Source + +| old | new | +--------------------------|---------------- +| fromFuture | @ref:[future](../stream/operators/Source/future.md) | +| fromCompletionStage | @ref:[completionStage](../stream/operators/Source/completionStage.md) | +| fromFutureSource | @ref:[futureSource](../stream/operators/Source/futureSource.md) | +| fromSourceCompletionStage | | +| lazily | @ref:[lazySource](../stream/operators/Source/lazySource.md) | +| lazilyAsync | @ref:[lazyFuture](../stream/operators/Source/lazyFuture.md) | +| | @ref:[lazySingle](../stream/operators/Source/lazySingle.md) | +| | @ref:[lazyFutureSource](../stream/operators/Source/lazyFutureSource.md) | diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/completionStageFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/completionStageFlow.md new file mode 100644 index 0000000000..7cfda7e3ff --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Flow/completionStageFlow.md @@ -0,0 +1,42 @@ +# Flow.completionStageFlow + +Streams the elements through the given future flow once it successfully completes. + +@ref[Simple operators](../index.md#simple-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #futureFlow } + +@@@ + +## Description + +Streams the elements through the given flow once the `CompletionStage` successfully completes. +If the future fails the stream is failed. + +## Examples + +A deferred creation of the stream based on the initial element by combining `completionStageFlow` +with `prefixAndTail` like so: + +Scala +: @@snip [FutureFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/FutureFlow.java) { #base-on-first-element } + + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the internal flow is successfully created and it emits + +**backpressures** when the internal flow is successfully created and it backpressures + +**completes** when upstream completes and all elements have been emitted from the internal flow + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/futureFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/futureFlow.md new file mode 100644 index 0000000000..205709624c --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Flow/futureFlow.md @@ -0,0 +1,43 @@ +# Flow.futureFlow + +Streams the elements through the given future flow once it successfully completes. + +@ref[Simple operators](../index.md#simple-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #futureFlow } + +@@@ + +## Description + +Streams the elements through the given future flow once it successfully completes. +If the future fails the stream is failed. + +## Examples + +A deferred creation of the stream based on the initial element can be achieved by combining `futureFlow` +with `prefixAndTail` like so: + +Scala +: @@snip [FutureFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/FutureFlow.scala) { #base-on-first-element } + + + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the internal flow is successfully created and it emits + +**backpressures** when the internal flow is successfully created and it backpressures + +**completes** when upstream completes and all elements have been emitted from the internal flow + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/lazyCompletionStageFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/lazyCompletionStageFlow.md new file mode 100644 index 0000000000..8afc8046c7 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Flow/lazyCompletionStageFlow.md @@ -0,0 +1,35 @@ +# Flow.lazyCompletionStageFlow + +Defers creation and materialization of a `Flow` until there is a first element. + +@ref[Simple operators](../index.md#simple-operators) + + +## Description + +When the first element comes from upstream the actual `CompletionStage` is created and when that completes it is materialized +and inserted in the stream. +The internal `Flow` will not be created if there are no elements on completion or failure of up or downstream. + +The materialized value of the `Flow` will be the materialized value of the created internal flow if it is materialized +and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. + +See also @ref:[lazyFlow](lazyFlow.md). + +Can be combined with `prefixAndTail(1)` to base the flow construction on the initial element triggering creation. +See @ref:[lazyFlow](lazyFlow.md) for sample. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the internal flow is successfully created and it emits + +**backpressures** when the internal flow is successfully created and it backpressures + +**completes** when upstream completes and all elements have been emitted from the internal flow + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md new file mode 100644 index 0000000000..3a63c8b98b --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFlow.md @@ -0,0 +1,36 @@ +# Flow.lazyFlow + +Defers creation and materialization of a `Flow` until there is a first element. + +@ref[Simple operators](../index.md#simple-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #lazyFlow } + +@@@ + +## Description + +When the first element comes from upstream the actual `Flow` is created and materialized. +The internal `Flow` will not be created if there are no elements on completion or failure of up or downstream. + +The materialized value of the `Flow` will be the materialized value of the created internal flow if it is materialized +and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the internal flow is successfully created and it emits + +**backpressures** when the internal flow is successfully created and it backpressures + +**completes** when upstream completes and all elements have been emitted from the internal flow + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/lazyFutureFlow.md b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFutureFlow.md new file mode 100644 index 0000000000..1be4a6af28 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Flow/lazyFutureFlow.md @@ -0,0 +1,42 @@ +# Flow.lazyFutureFlow + +Defers creation and materialization of a `Flow` until there is a first element. + +@ref[Simple operators](../index.md#simple-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #lazyFlow } + +@@@ + +## Description + +When the first element comes from upstream the actual `Future[Flow]` is created and when that completes it is materialized +and inserted in the stream. +The internal `Flow` will not be created if there are no elements on completion or failure of up or downstream. + +The materialized value of the `Flow` will be the materialized value of the created internal flow if it is materialized +and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. + +See also @ref:[lazyFlow](lazyFlow.md). + +Can be combined with `prefixAndTail(1)` to base the flow construction on the initial element triggering creation. +See @ref:[lazyFlow](lazyFlow.md) for sample. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the internal flow is successfully created and it emits + +**backpressures** when the internal flow is successfully created and it backpressures + +**completes** when upstream completes and all elements have been emitted from the internal flow + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/lazyInitAsync.md b/akka-docs/src/main/paradox/stream/operators/Flow/lazyInitAsync.md index 97c103e942..30feb1ca50 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/lazyInitAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/lazyInitAsync.md @@ -1,6 +1,6 @@ # Flow.lazyInitAsync -Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument. +`lazyInitAsync` has been deprecated in 2.6.0 use `Flow.lazyFutureFlow` in combination with `prefixAndTail` instead. @ref[Simple operators](../index.md#simple-operators) @@ -14,17 +14,9 @@ Creates a real `Flow` upon receiving the first element by calling relevant `flow ## Description -Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument. -Internal `Flow` will not be created if there are no elements, because of completion or error. -The materialized value of the `Flow` will be the materialized value of the created internal flow. +`fromCompletionStage` has been deprecated in 2.6.0 use @ref:[lazyFutureFlow](lazyFutureFlow.md) in combination with `prefixAndTail` instead. -The materialized value of the `Flow` is a @scala[`Future[Option[M]]`]@java[`CompletionStage>`] that is -completed with @scala[`Some(mat)`]@java[`Optional.of(mat)`] when the internal flow gets materialized or with @scala[`None`] -@java[an empty optional] when there where no elements. If the flow materialization (including the call of the `flowFactory`) -fails then the future is completed with a failure. - -Adheres to the @scala[@scaladoc[`ActorAttributes.SupervisionStrategy`](akka.stream.ActorAttributes$$SupervisionStrategy)] -@java[`ActorAttributes.SupervisionStrategy`] attribute. +Defers creation until a first element arrives. ## Reactive Streams semantics diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/completionStageSink.md b/akka-docs/src/main/paradox/stream/operators/Sink/completionStageSink.md new file mode 100644 index 0000000000..186c187e6d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/completionStageSink.md @@ -0,0 +1,23 @@ +# Sink.completionStageSink + +Streams the elements to the given future sink once it successfully completes. + +@ref[Sink operators](../index.md#sink-operators) + + +## Description + +Streams the elements through the given future flow once it successfully completes. +If the future fails the stream is failed. + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** if the future fails or if the created sink cancels + +**backpressures** when initialized and when created sink backpressures + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/futureSink.md b/akka-docs/src/main/paradox/stream/operators/Sink/futureSink.md new file mode 100644 index 0000000000..24784fe624 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/futureSink.md @@ -0,0 +1,30 @@ +# Sink.futureSink + +Streams the elements to the given future sink once it successfully completes. + +@ref[Sink operators](../index.md#sink-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #futureSink } + +@@@ + +## Description + +Streams the elements through the given future flow once it successfully completes. +If the future fails the stream is failed. + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** if the future fails or if the created sink cancels + +**backpressures** when initialized and when created sink backpressures + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md b/akka-docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md new file mode 100644 index 0000000000..9deb5452e8 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/lazyCompletionStageSink.md @@ -0,0 +1,31 @@ +# Sink.lazyCompletionStageSink + +Defers creation and materialization of a `Sink` until there is a first element. + +@ref[Sink operators](../index.md#sink-operators) + + +## Description + +When the first element comes from upstream the `CompletionStage` is created. When that completes successfully with a sink +that is materialized and inserted in the stream. +The internal `Sink` will not be created if the stream completes of fails before any element got through. + +The materialized value of the `Sink` will be the materialized value of the created internal flow if it is materialized +and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. + +Can be combined with @ref:[prefixAndTail](../Source-or-Flow/prefixAndTail.md) to base the sink on the first element. + +See also @ref:[lazySink](lazySink.md). + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** if the future fails or if the created sink cancels + +**backpressures** when initialized and when created sink backpressures + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md b/akka-docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md new file mode 100644 index 0000000000..a56a5ae683 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/lazyFutureSink.md @@ -0,0 +1,38 @@ +# Sink.lazyFutureSink + +Defers creation and materialization of a `Sink` until there is a first element. + +@ref[Sink operators](../index.md#sink-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #lazySink } + +@@@ + +## Description + +When the first element comes from upstream the `Future[Sink]` is created. When that completes successfully with a sink +that is materialized and inserted in the stream. +The internal `Sink` will not be created if the stream completes of fails before any element got through. + +The materialized value of the `Sink` will be the materialized value of the created internal flow if it is materialized +and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. + +Can be combined with @ref:[prefixAndTail](../Source-or-Flow/prefixAndTail.md) to base the sink on the first element. + +See also @ref:[lazySink](lazySink.md). + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** if the future fails or if the created sink cancels + +**backpressures** when initialized and when created sink backpressures + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/lazyInitAsync.md b/akka-docs/src/main/paradox/stream/operators/Sink/lazyInitAsync.md index f82296f93a..900d989dce 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/lazyInitAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/lazyInitAsync.md @@ -1,6 +1,6 @@ # Sink.lazyInitAsync -Creates a real `Sink` upon receiving the first element. +`lazyInitAsync` has been deprecated in 2.6.0, use `Sink.lazyFutureSink` @ref[Sink operators](../index.md#sink-operators) @@ -14,6 +14,8 @@ Creates a real `Sink` upon receiving the first element. ## Description +`lazyInitAsync` has been deprecated in 2.6.0, use @ref:[lazyFutureSink](lazyFutureSink.md) instead. + Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, because of completion or error. diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/lazySink.md b/akka-docs/src/main/paradox/stream/operators/Sink/lazySink.md new file mode 100644 index 0000000000..3c54d5968a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/lazySink.md @@ -0,0 +1,37 @@ +# Sink.lazySink + +Defers creation and materialization of a `Sink` until there is a first element. + +@ref[Sink operators](../index.md#sink-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #lazySink } + +@@@ + +## Description + +When the first element comes from upstream the actual `Sink` is created and materialized. +The internal `Sink` will not be created if the stream completes of fails before any element got through. + +The materialized value of the `Sink` will be the materialized value of the created internal flow if it is materialized +and failed with a `akka.stream.NeverMaterializedException` if the stream fails or completes without the flow being materialized. + +Can be combined with @ref[prefixAndTail](../Source-or-Flow/prefixAndTail.md) to base the sink on the first element. + +See also @ref:[lazyFutureSink](lazyFutureSink.md) and @ref:[lazyCompletionStageSink](lazyCompletionStageSink.md). + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** if the future fails or if the created sink cancels + +**backpressures** when initialized and when created sink backpressures + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md b/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md new file mode 100644 index 0000000000..4c4358eb83 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md @@ -0,0 +1,32 @@ +# completionStage + +Send the single value of the `CompletionStage` when it completes and there is demand. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #completionStage } + +@@@ + +## Description + +Send the single value of the `CompletionStage` when it completes and there is demand. +If the `CompletionStage` completes with `null` stage is completed without emitting a value. +If the `CompletionStage` fails the stream is failed with that exception. + +For the corresponding operator for the Scala standard library `Future` see @ref:[future](future.md). + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the future completes + +**completes** after the future has completed + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/completionStageSource.md b/akka-docs/src/main/paradox/stream/operators/Source/completionStageSource.md new file mode 100644 index 0000000000..4bdd85c46d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/completionStageSource.md @@ -0,0 +1,23 @@ +# completionStageSource + +Streams the elements of an asynchronous source once its given *completion* operator completes. + +@ref[Source operators](../index.md#source-operators) + +## Signature + +## Description + +Streams the elements of an asynchronous source once its given *completion* operator completes. +If the *completion* fails the stream is failed with that exception. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the next value from the asynchronous source, once its *completion operator* has completed + +**completes** after the asynchronous source completes + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromCompletionStage.md b/akka-docs/src/main/paradox/stream/operators/Source/fromCompletionStage.md index b8fd02375b..0c5d829176 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/fromCompletionStage.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/fromCompletionStage.md @@ -1,6 +1,6 @@ # fromCompletionStage -Send the single value of the `CompletionStage` when it completes and there is demand. +`fromCompletionStage` has been deprecated in 2.6.0, use `Source.completionStage` @ref[Source operators](../index.md#source-operators) @@ -14,6 +14,8 @@ Send the single value of the `CompletionStage` when it completes and there is de ## Description +`fromCompletionStage` has been deprecated in 2.6.0, use @ref:[completionStage](completionStage.md) instead. + Send the single value of the `CompletionStage` when it completes and there is demand. If the `CompletionStage` completes with `null` stage is completed without emitting a value. If the `CompletionStage` fails the stream is failed with that exception. diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromFuture.md b/akka-docs/src/main/paradox/stream/operators/Source/fromFuture.md index 215a619720..d6d289eccb 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/fromFuture.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/fromFuture.md @@ -1,6 +1,6 @@ # fromFuture -Send the single value of the `Future` when it completes and there is demand. +`fromFuture` has been deprecated in 2.6.0, use `Source.future` instead. @ref[Source operators](../index.md#source-operators) @@ -14,6 +14,8 @@ Send the single value of the `Future` when it completes and there is demand. ## Description +`fromFuture` has been deprecated in 2.6.0, use @ref:[future](future.md) instead. + Send the single value of the `Future` when it completes and there is demand. If the future fails the stream is failed with that exception. @@ -28,6 +30,4 @@ If the future fails the stream is failed with that exception. @@@ ## Example -Scala -: @@snip [SourceFromFuture.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #sourceFromFuture } diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromFutureSource.md b/akka-docs/src/main/paradox/stream/operators/Source/fromFutureSource.md index d347448624..aa6254cb0b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/fromFutureSource.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/fromFutureSource.md @@ -1,6 +1,6 @@ # fromFutureSource -Streams the elements of the given future source once it successfully completes. +`fromFutureSource` has been deprecated in 2.6.0, use `Source.futureSource` instead. @ref[Source operators](../index.md#source-operators) @@ -14,6 +14,8 @@ Streams the elements of the given future source once it successfully completes. ## Description +`fromFutureSource` has been deprecated in 2.6.0, use @ref:[futureSource](futureSource.md) instead. + Streams the elements of the given future source once it successfully completes. If the future fails the stream is failed. diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromSourceCompletionStage.md b/akka-docs/src/main/paradox/stream/operators/Source/fromSourceCompletionStage.md index 6ecb828d43..f077d964a2 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/fromSourceCompletionStage.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/fromSourceCompletionStage.md @@ -1,6 +1,6 @@ # fromSourceCompletionStage -Streams the elements of an asynchronous source once its given *completion* operator completes. +`fromSourceCompletionStage` has been deprecated in 2.6.0, use `Source.completionStageSource` instead. @ref[Source operators](../index.md#source-operators) @@ -8,6 +8,8 @@ Streams the elements of an asynchronous source once its given *completion* opera ## Description +`fromSourceCompletionStage` has been deprecated in 2.6.0, use @ref:[completionStageSource](completionStageSource.md) instead. + Streams the elements of an asynchronous source once its given *completion* operator completes. If the *completion* fails the stream is failed with that exception. diff --git a/akka-docs/src/main/paradox/stream/operators/Source/future.md b/akka-docs/src/main/paradox/stream/operators/Source/future.md new file mode 100644 index 0000000000..5bcc285c17 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/future.md @@ -0,0 +1,35 @@ +# fromFuture + +Send the single value of the `Future` when it completes and there is demand. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #future } + +@@@ + +## Description + +Send the single value of the `Future` when it completes and there is demand. +If the future fails the stream is failed with that exception. + +For the corresponding operator for the Java standard library `CompletionStage` see @ref:[completionStage](completionStage.md). + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the future completes + +**completes** after the future has completed + +@@@ + +## Example +Scala +: @@snip [SourceFromFuture.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #sourceFromFuture } + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/futureSource.md b/akka-docs/src/main/paradox/stream/operators/Source/futureSource.md new file mode 100644 index 0000000000..389f2e86a3 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/futureSource.md @@ -0,0 +1,29 @@ +# futureSource + +Streams the elements of the given future source once it successfully completes. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #futureSource } + +@@@ + +## Description + +Streams the elements of the given future source once it successfully completes. +If the future fails the stream is failed. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the next value from the *future* source, once it has completed + +**completes** after the *future* source completes + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazily.md b/akka-docs/src/main/paradox/stream/operators/Source/lazily.md index ed68ce14bb..6795432a0a 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/lazily.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazily.md @@ -1,6 +1,6 @@ # lazily -Defers creation and materialization of a `Source` until there is demand. +`lazily` has been deprecated in 2.6.0, use `Source.lazySource` instead. @ref[Source operators](../index.md#source-operators) @@ -14,6 +14,8 @@ Defers creation and materialization of a `Source` until there is demand. ## Description +`lazily` has been deprecated in 2.6.0, use @ref:[lazySource](lazySource.md) instead. + Defers creation and materialization of a `Source` until there is demand. ## Reactive Streams semantics diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazilyAsync.md b/akka-docs/src/main/paradox/stream/operators/Source/lazilyAsync.md index 3816ed8944..ba79dc5773 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/lazilyAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazilyAsync.md @@ -1,6 +1,6 @@ # lazilyAsync -Defers creation and materialization of a `CompletionStage` until there is demand. +`lazily` has been deprecated in 2.6.0, use `Source.lazyFutureSource` instead. @ref[Source operators](../index.md#source-operators) @@ -8,6 +8,8 @@ Defers creation and materialization of a `CompletionStage` until there is demand ## Description +`lazily` has been deprecated in 2.6.0, use @ref:[lazyFutureSource](lazyFutureSource.md) instead. + Defers creation and materialization of a `CompletionStage` until there is demand. ## Reactive Streams semantics diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazyCompletionStage.md b/akka-docs/src/main/paradox/stream/operators/Source/lazyCompletionStage.md new file mode 100644 index 0000000000..45281e24fe --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazyCompletionStage.md @@ -0,0 +1,25 @@ +# lazyCompletionStage + +Defers creation of a future of a single element source until there is demand. + +@ref[Source operators](../index.md#source-operators) + +## Description + +Invokes the user supplied factory when the first downstream demand arrives. When the returned future completes +successfully the value is emitted downstream as a single stream element. If the future or the factory fails the +stream is failed. + +Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts +the laziness and will trigger the factory immediately. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when there is downstream demand and the element factory returned future has completed + +**completes** after emitting the single element + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazyCompletionStageSource.md b/akka-docs/src/main/paradox/stream/operators/Source/lazyCompletionStageSource.md new file mode 100644 index 0000000000..f364b5d09e --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazyCompletionStageSource.md @@ -0,0 +1,27 @@ +# lazyCompletionStageSource + +Defers creation of a future source until there is demand. + +@ref[Source operators](../index.md#source-operators) + +## Description + +Invokes the user supplied factory when the first downstream demand arrives. When the returned `CompletionStage` completes +successfully the source switches over to the new source and emits downstream just like if it had been created up front. If the future or the factory fails the +stream is failed. + +Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts +the laziness and will trigger the factory immediately. + +See also @ref:[lazySource](lazySource.md). + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** depends on the wrapped `Source` + +**completes** depends on the wrapped `Source` + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazyFuture.md b/akka-docs/src/main/paradox/stream/operators/Source/lazyFuture.md new file mode 100644 index 0000000000..5032a54eeb --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazyFuture.md @@ -0,0 +1,33 @@ +# lazyFuture + +Defers creation of a future of a single element source until there is demand. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #lazyFuture } + +@@@ + +## Description + +Invokes the user supplied factory when the first downstream demand arrives. When the returned future completes +successfully the value is emitted downstream as a single stream element. If the future or the factory fails the +stream is failed. + +Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts +the laziness and will trigger the factory immediately. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when there is downstream demand and the element factory returned future has completed + +**completes** after emitting the single element + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazyFutureSource.md b/akka-docs/src/main/paradox/stream/operators/Source/lazyFutureSource.md new file mode 100644 index 0000000000..0a14ef20cb --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazyFutureSource.md @@ -0,0 +1,34 @@ +# lazyFutureSource + +Defers creation and materialization of a `Source` until there is demand. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #lazyFutureSource } + +@@@ + +## Description + +Invokes the user supplied factory when the first downstream demand arrives. When the returned future completes +successfully the source switches over to the new source and emits downstream just like if it had been created up front. + +Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts +the laziness and will trigger the factory immediately. + +See also @ref:[lazySource](lazySource.md). + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** depends on the wrapped `Source` + +**completes** depends on the wrapped `Source` + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazySingle.md b/akka-docs/src/main/paradox/stream/operators/Source/lazySingle.md new file mode 100644 index 0000000000..8432842d46 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazySingle.md @@ -0,0 +1,32 @@ +# lazySingle + +Defers creation of a single element source until there is demand. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #lazySingle } + +@@@ + +## Description + +Invokes the user supplied factory when the first downstream demand arrives, then emits the returned single value +downstream and completes the stream. + +Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts +the laziness and will trigger the factory immediately. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when there is downstream demand and the element factory has completed + +**completes** after emitting the single element + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/Source/lazySource.md b/akka-docs/src/main/paradox/stream/operators/Source/lazySource.md new file mode 100644 index 0000000000..a243e0ca0a --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/lazySource.md @@ -0,0 +1,35 @@ +# lazySource + +Defers creation and materialization of a `Source` until there is demand. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #lazySource } + +@@@ + +## Description + +Defers creation and materialization of a `Source` until there is demand, then emits the elements from the source +downstream just like if it had been created up front. + +See also @ref:[lazyFutureSource](lazyFutureSource.md). + +Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts +the laziness and will trigger the factory immediately. + + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** depends on the wrapped `Source` + +**completes** depends on the wrapped `Source` + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 67df16009c..d9ad0c6a3f 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -12,18 +12,28 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| |Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| +|Source|@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.| +|Source|@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.| |Source|@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.| |Source|@ref[empty](Source/empty.md)|Complete right away without ever emitting any elements.| |Source|@ref[failed](Source/failed.md)|Fail directly with a user specified exception.| |Source|@ref[@scala[apply]@java[from]](Source/from.md)|Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].| -|Source|@ref[fromCompletionStage](Source/fromCompletionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.| -|Source|@ref[fromFuture](Source/fromFuture.md)|Send the single value of the `Future` when it completes and there is demand.| -|Source|@ref[fromFutureSource](Source/fromFutureSource.md)|Streams the elements of the given future source once it successfully completes.| +|Source|@ref[fromCompletionStage](Source/fromCompletionStage.md)|`fromCompletionStage` has been deprecated in 2.6.0, use `Source.completionStage`| +|Source|@ref[fromFuture](Source/fromFuture.md)|`fromFuture` has been deprecated in 2.6.0, use `Source.future` instead.| +|Source|@ref[fromFutureSource](Source/fromFutureSource.md)|`fromFutureSource` has been deprecated in 2.6.0, use `Source.futureSource` instead.| |Source|@ref[fromIterator](Source/fromIterator.md)|Stream the values from an `Iterator`, requesting the next value when there is demand.| |Source|@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`.| -|Source|@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.| -|Source|@ref[lazily](Source/lazily.md)|Defers creation and materialization of a `Source` until there is demand.| -|Source|@ref[lazilyAsync](Source/lazilyAsync.md)|Defers creation and materialization of a `CompletionStage` until there is demand.| +|Source|@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|`fromSourceCompletionStage` has been deprecated in 2.6.0, use `Source.completionStageSource` instead.| +|Source|@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.| +|Source|@ref[futureSource](Source/futureSource.md)|Streams the elements of the given future source once it successfully completes.| +|Source|@ref[lazily](Source/lazily.md)|`lazily` has been deprecated in 2.6.0, use `Source.lazySource` instead.| +|Source|@ref[lazilyAsync](Source/lazilyAsync.md)|`lazily` has been deprecated in 2.6.0, use `Source.lazyFutureSource` instead.| +|Source|@ref[lazyCompletionStage](Source/lazyCompletionStage.md)|Defers creation of a future of a single element source until there is demand.| +|Source|@ref[lazyCompletionStageSource](Source/lazyCompletionStageSource.md)|Defers creation of a future source until there is demand.| +|Source|@ref[lazyFuture](Source/lazyFuture.md)|Defers creation of a future of a single element source until there is demand.| +|Source|@ref[lazyFutureSource](Source/lazyFutureSource.md)|Defers creation and materialization of a `Source` until there is demand.| +|Source|@ref[lazySingle](Source/lazySingle.md)|Defers creation of a single element source until there is demand.| +|Source|@ref[lazySource](Source/lazySource.md)|Defers creation and materialization of a `Source` until there is demand.| |Source|@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.| |Source|@ref[queue](Source/queue.md)|Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. | |Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.| @@ -49,18 +59,23 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav |Sink|@ref[asPublisher](Sink/asPublisher.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Publisher`.| |Sink|@ref[cancelled](Sink/cancelled.md)|Immediately cancel the stream| |Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| +|Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. | |Sink|@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.| |Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.| |Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.| |Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.| |Sink|@ref[fromMaterializer](Sink/fromMaterializer.md)|Defer the creation of a `Sink` until materialization and access `Materializer` and `Attributes`| |Sink|@ref[fromSubscriber](Sink/fromSubscriber.md)|Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink.| +|Sink|@ref[futureSink](Sink/futureSink.md)|Streams the elements to the given future sink once it successfully completes. | |Sink|@ref[head](Sink/head.md)|Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving, after this the stream is canceled.| |Sink|@ref[headOption](Sink/headOption.md)|Materializes into a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the first value arriving wrapped in @scala[`Some`] @java[`Optional`], or @scala[a `None`] @java[an empty Optional] if the stream completes without any elements emitted.| |Sink|@ref[ignore](Sink/ignore.md)|Consume all elements but discards them.| |Sink|@ref[last](Sink/last.md)|Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with the last value emitted when the stream completes.| |Sink|@ref[lastOption](Sink/lastOption.md)|Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the last value emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes.| -|Sink|@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Creates a real `Sink` upon receiving the first element. | +|Sink|@ref[lazyCompletionStageSink](Sink/lazyCompletionStageSink.md)|Defers creation and materialization of a `Sink` until there is a first element.| +|Sink|@ref[lazyFutureSink](Sink/lazyFutureSink.md)|Defers creation and materialization of a `Sink` until there is a first element.| +|Sink|@ref[lazyInitAsync](Sink/lazyInitAsync.md)|`lazyInitAsync` has been deprecated in 2.6.0, use `Sink.lazyFutureSink` | +|Sink|@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.| |Sink|@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.| |Sink|@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.| |Sink|@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.| @@ -129,6 +144,7 @@ depending on being backpressured by downstream or not. |Flow|@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.| |Source/Flow|@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.| |Source/Flow|@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.| +|Flow|@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.| |Source/Flow|@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.| |Source/Flow|@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.| |Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| @@ -137,9 +153,13 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[fold](Source-or-Flow/fold.md)|Start with current value `zero` and then apply the current and next value to the given function. When upstream completes, the current value is emitted downstream.| |Source/Flow|@ref[foldAsync](Source-or-Flow/foldAsync.md)|Just like `fold` but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| |Source/Flow|@ref[fromMaterializer](Source-or-Flow/fromMaterializer.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| +|Flow|@ref[futureFlow](Flow/futureFlow.md)|Streams the elements through the given future flow once it successfully completes.| |Source/Flow|@ref[grouped](Source-or-Flow/grouped.md)|Accumulate incoming events until the specified number of elements have been accumulated and then pass the collection of elements downstream.| |Source/Flow|@ref[intersperse](Source-or-Flow/intersperse.md)|Intersperse stream with provided element similar to `List.mkString`.| -|Flow|@ref[lazyInitAsync](Flow/lazyInitAsync.md)|Creates a real `Flow` upon receiving the first element by calling relevant `flowFactory` given as an argument.| +|Flow|@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.| +|Flow|@ref[lazyFlow](Flow/lazyFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.| +|Flow|@ref[lazyFutureFlow](Flow/lazyFutureFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.| +|Flow|@ref[lazyInitAsync](Flow/lazyInitAsync.md)|`lazyInitAsync` has been deprecated in 2.6.0 use `Flow.lazyFutureFlow` in combination with `prefixAndTail` instead.| |Source/Flow|@ref[limit](Source-or-Flow/limit.md)|Limit number of element from upstream to given `max` number.| |Source/Flow|@ref[limitWeighted](Source-or-Flow/limitWeighted.md)|Ensure stream boundedness by evaluating the cost of incoming elements using a cost function.| |Source/Flow|@ref[log](Source-or-Flow/log.md)|Log elements flowing through the stream as well as completion and erroring.| @@ -337,6 +357,13 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [failed](Source/failed.md) * [lazily](Source/lazily.md) * [lazilyAsync](Source/lazilyAsync.md) +* [future](Source/future.md) +* [completionStage](Source/completionStage.md) +* [futureSource](Source/futureSource.md) +* [lazySingle](Source/lazySingle.md) +* [lazyFuture](Source/lazyFuture.md) +* [lazySource](Source/lazySource.md) +* [lazyFutureSource](Source/lazyFutureSource.md) * [asSubscriber](Source/asSubscriber.md) * [actorRef](Source/actorRef.md) * [actorRefWithBackpressure](Source/actorRefWithBackpressure.md) @@ -347,6 +374,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [unfoldResourceAsync](Source/unfoldResourceAsync.md) * [@scala[apply]@java[from]](Source/from.md) * [range](Source/range.md) +* [completionStageSource](Source/completionStageSource.md) +* [lazyCompletionStage](Source/lazyCompletionStage.md) +* [lazyCompletionStageSource](Source/lazyCompletionStageSource.md) * [concat](Source-or-Flow/concat.md) * [prepend](Source-or-Flow/prepend.md) * [orElse](Source-or-Flow/orElse.md) @@ -427,6 +457,11 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [fromSinkAndSource](Flow/fromSinkAndSource.md) * [fromSinkAndSourceCoupled](Flow/fromSinkAndSourceCoupled.md) * [lazyInitAsync](Flow/lazyInitAsync.md) +* [futureFlow](Flow/futureFlow.md) +* [lazyFlow](Flow/lazyFlow.md) +* [lazyFutureFlow](Flow/lazyFutureFlow.md) +* [completionStageFlow](Flow/completionStageFlow.md) +* [lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md) * [preMaterialize](Sink/preMaterialize.md) * [fromMaterializer](Sink/fromMaterializer.md) * [setup](Sink/setup.md) @@ -451,6 +486,11 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [actorRefWithBackpressure](Sink/actorRefWithBackpressure.md) * [queue](Sink/queue.md) * [lazyInitAsync](Sink/lazyInitAsync.md) +* [futureSink](Sink/futureSink.md) +* [lazySink](Sink/lazySink.md) +* [lazyFutureSink](Sink/lazyFutureSink.md) +* [completionStageSink](Sink/completionStageSink.md) +* [lazyCompletionStageSink](Sink/lazyCompletionStageSink.md) * [fromInputStream](StreamConverters/fromInputStream.md) * [asOutputStream](StreamConverters/asOutputStream.md) * [fromOutputStream](StreamConverters/fromOutputStream.md) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/flow/FutureFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/flow/FutureFlow.java new file mode 100644 index 0000000000..937695f772 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/flow/FutureFlow.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.stream.operators.flow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Source; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public class FutureFlow { + + private ActorSystem system = null; + + // #base-on-first-element + CompletionStage> processingFlow(int id) { + return CompletableFuture.completedFuture( + Flow.of(Integer.class).map(n -> "id: " + id + " value: " + n)); + } + // #base-on-first-element + + public void compileOnlyBaseOnFirst() { + // #base-on-first-element + + Source source = + Source.range(1, 10) + .prefixAndTail(1) + .flatMapConcat( + (pair) -> { + List head = pair.first(); + Source tail = pair.second(); + + int id = head.get(0); + + return tail.via(Flow.completionStageFlow(processingFlow(id))); + }); + // #base-on-first-element + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala index a70facc182..a481abd5d6 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala @@ -20,7 +20,7 @@ object SourceOperators { import scala.concurrent.Future - val source: Source[Int, NotUsed] = Source.fromFuture(Future.successful(10)) + val source: Source[Int, NotUsed] = Source.future(Future.successful(10)) val sink: Sink[Int, Future[Done]] = Sink.foreach((i: Int) => println(i)) val done: Future[Done] = source.runWith(sink) //10 diff --git a/akka-docs/src/test/scala/docs/stream/operators/flow/FutureFlow.scala b/akka-docs/src/test/scala/docs/stream/operators/flow/FutureFlow.scala new file mode 100644 index 0000000000..7bdf0ad336 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/flow/FutureFlow.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.stream.operators.flow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Source + +import scala.concurrent.Future + +class FutureFlow { + + implicit val system: ActorSystem = ??? + import system.dispatcher + + def compileOnlyBaseOnFirst(): Unit = { + // #base-on-first-element + def processingFlow(id: Int): Future[Flow[Int, String, NotUsed]] = + Future { + Flow[Int].map(n => s"id: $id, value: $n") + } + + val source: Source[String, NotUsed] = + Source(1 to 10).prefixAndTail(1).flatMapConcat { + case (List(id), tail) => + // base the Future flow creation on the first element + tail.via(Flow.futureFlow(processingFlow(id))) + } + // #base-on-first-element + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 70216a4292..b16f9fb40a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -150,7 +150,7 @@ private[remote] class ArteryTcpTransport( def flow(controlIdleKillSwitch: OptionVal[SharedKillSwitch]) = Flow[ByteString] - .via(Flow.lazyInitAsync(() => { + .via(Flow.lazyFlow(() => { // only open the actual connection if any new messages are sent afr.loFreq( TcpOutbound_Connected, @@ -158,10 +158,8 @@ private[remote] class ArteryTcpTransport( s"/ ${streamName(streamId)}") if (controlIdleKillSwitch.isDefined) outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(controlIdleKillSwitch) - Future.successful( - Flow[ByteString] - .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) - .via(connectionFlow)) + + Flow[ByteString].prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))).via(connectionFlow) })) .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal => Source.empty }) .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream") diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala index 7eae34121f..9bb4e993db 100644 --- a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala +++ b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FuturePublisherTest.scala @@ -14,7 +14,7 @@ class FuturePublisherTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val p = Promise[Int]() - val pub = Source.fromFuture(p.future).runWith(Sink.asPublisher(false)) + val pub = Source.future(p.future).runWith(Sink.asPublisher(false)) p.success(0) pub } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureFlowTest.java new file mode 100644 index 0000000000..3210097e62 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureFlowTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.stream.StreamTest; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class LazyAndFutureFlowTest extends StreamTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("LazyAndFutureFlowTest", AkkaSpec.testConf()); + + public LazyAndFutureFlowTest() { + super(actorSystemResource); + } + + // note these are minimal happy path tests to cover API, more thorough tests are on the Scala side + + @Test + public void completionStageFlow() throws Exception { + CompletionStage> result = + Source.single("one") + .via( + Flow.completionStageFlow( + CompletableFuture.completedFuture(Flow.fromFunction(str -> str)))) + .runWith(Sink.seq(), system); + + assertEquals(Arrays.asList("one"), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void lazyFlow() throws Exception { + CompletionStage> result = + Source.single("one") + .via(Flow.lazyFlow(() -> Flow.fromFunction(str -> str))) + .runWith(Sink.seq(), system); + + assertEquals(Arrays.asList("one"), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void lazyCompletionStageFlow() throws Exception { + CompletionStage> result = + Source.single("one") + .via( + Flow.lazyCompletionStageFlow( + () -> CompletableFuture.completedFuture(Flow.fromFunction(str -> str)))) + .runWith(Sink.seq(), system); + + assertEquals(Arrays.asList("one"), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureSourcesTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureSourcesTest.java new file mode 100644 index 0000000000..da5940e2d3 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureSourcesTest.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.Done; +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.StreamTest; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.Future; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class LazyAndFutureSourcesTest extends StreamTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("LazyAndFutureSourcesTest", AkkaSpec.testConf()); + + public LazyAndFutureSourcesTest() { + super(actorSystemResource); + } + + // note these are minimal happy path tests to cover API, more thorough tests are on the Scala side + + @Test + public void future() throws Exception { + CompletionStage> result = + Source.future(Future.successful("one")).runWith(Sink.seq(), system); + + assertEquals(Arrays.asList("one"), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void completionStage() throws Exception { + CompletionStage one = CompletableFuture.completedFuture("one"); + CompletionStage> result = Source.completionStage(one).runWith(Sink.seq(), system); + + assertEquals(Arrays.asList("one"), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void completionStageSource() throws Exception { + Pair, CompletionStage>> result = + Source.completionStageSource(CompletableFuture.completedFuture(Source.single("one"))) + .toMat(Sink.seq(), Keep.both()) + .run(system); + + CompletionStage nestedMatVal = result.first(); + CompletionStage> list = result.second(); + assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS)); + assertEquals(true, nestedMatVal.toCompletableFuture().isDone()); + } + + @Test + public void lazySingle() throws Exception { + CompletionStage> list = Source.lazySingle(() -> "one").runWith(Sink.seq(), system); + + assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void lazyCompletionStage() throws Exception { + CompletionStage> list = + Source.lazyCompletionStage(() -> CompletableFuture.completedFuture("one")) + .runWith(Sink.seq(), system); + + assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void lazySource() throws Exception { + Pair, CompletionStage>> result = + Source.lazySource(() -> Source.single("one")).toMat(Sink.seq(), Keep.both()).run(system); + + CompletionStage nestedMatVal = result.first(); + CompletionStage> list = result.second(); + assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS)); + assertEquals(true, nestedMatVal.toCompletableFuture().isDone()); + } + + @Test + public void lazyCompletionStageSource() throws Exception { + Pair, CompletionStage>> result = + Source.lazyCompletionStageSource( + () -> CompletableFuture.completedFuture(Source.single("one"))) + .toMat(Sink.seq(), Keep.both()) + .run(system); + + CompletionStage nestedMatVal = result.first(); + CompletionStage> list = result.second(); + assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS)); + assertEquals(true, nestedMatVal.toCompletableFuture().isDone()); + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index ea83b362cd..d785a1d26b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -21,7 +21,16 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { "toString", "getClass", "shape", - "identityTraversalBuilder") + "identityTraversalBuilder", + // futures in scaladsl vs completion stage in javadsl + "lazyFutureSource", // lazyCompletionStageSource + "futureSource", // completionStageSource + "lazyFuture", // lazyCompletionStage + "lazyFutureFlow", // lazyCompletionStageFlow + "futureFlow", // completionStageFlow + "futureSink", // completionStageSink + "lazyFutureSink" // lazyCompletionStageSink + ) val javaIgnore = Set("adapt") // the scaladsl -> javadsl bridge diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala index 62001ab9d0..472d52f6ba 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala @@ -120,7 +120,7 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { "correctly handle async errors in secondary upstream" in assertAllStagesStopped { val promise = Promise[Int]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(List(1, 2, 3)).concat(Source.fromFuture(promise.future)).runWith(Sink.fromSubscriber(subscriber)) + Source(List(1, 2, 3)).concat(Source.future(promise.future)).runWith(Sink.fromSubscriber(subscriber)) val subscription = subscriber.expectSubscription() subscription.request(4) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala index e602ac0631..75dbc0cb45 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -27,7 +27,7 @@ class FlowFlattenMergeSpec extends StreamSpec { import system.dispatcher def src10(i: Int) = Source(i until (i + 10)) - def blocked = Source.fromFuture(Promise[Int].future) + def blocked = Source.future(Promise[Int].future) val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right) val toSet = toSeq.mapMaterializedValue(_.map(_.toSet)) @@ -108,7 +108,7 @@ class FlowFlattenMergeSpec extends StreamSpec { val p1, p2 = TestPublisher.probe[Int]() val ex = new Exception("buh") val p = Promise[Source[Int, NotUsed]] - (Source(List(Source.fromPublisher(p1), Source.fromPublisher(p2))) ++ Source.fromFuture(p.future)) + (Source(List(Source.fromPublisher(p1), Source.fromPublisher(p2))) ++ Source.future(p.future)) .flatMapMerge(5, identity) .runWith(Sink.head) p1.expectRequest() @@ -122,7 +122,7 @@ class FlowFlattenMergeSpec extends StreamSpec { val p1, p2 = TestPublisher.probe[Int]() val ex = new Exception("buh") val p = Promise[Int] - Source(List(Source.fromPublisher(p1), Source.fromPublisher(p2), Source.fromFuture(p.future))) + Source(List(Source.fromPublisher(p1), Source.fromPublisher(p2), Source.future(p.future))) .flatMapMerge(5, identity) .runWith(Sink.head) p1.expectRequest() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index ef4366f2c9..962e4d3a09 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -6,12 +6,14 @@ package akka.stream.scaladsl import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ +import com.github.ghik.silencer.silent import scala.concurrent.duration._ import scala.concurrent.Future import scala.concurrent.Promise import scala.util.control.NoStackTrace +@silent("deprecated") // testing deprecated API class FlowFromFutureSpec extends StreamSpec { "A Flow based on a Future" must { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala deleted file mode 100644 index be269cd1bc..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package akka.stream.scaladsl - -import java.util.concurrent.{ CompletableFuture, TimeUnit } - -import akka.stream._ -import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue } -import akka.stream.testkit.Utils.TE -import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } -import akka.stream.testkit.scaladsl.StreamTestKit._ -import akka.testkit.TestLatch - -import scala.concurrent.{ Await, Future, Promise } - -class FutureFlattenSourceSpec extends StreamSpec { - - implicit def ec = system.dispatcher - - "Future source" must { - - val underlying: Source[Int, String] = - Source(List(1, 2, 3)).mapMaterializedValue(_ => "foo") - - "emit the elements of the already successful future source" in assertAllStagesStopped { - val (sourceMatVal, sinkMatVal) = - Source.fromFutureSource(Future.successful(underlying)).toMat(Sink.seq)(Keep.both).run() - - // should complete as soon as inner source has been materialized - sourceMatVal.futureValue should ===("foo") - sinkMatVal.futureValue should ===(List(1, 2, 3)) - } - - "emit no elements before the future of source successful" in assertAllStagesStopped { - val c = TestSubscriber.manualProbe[Int]() - val sourcePromise = Promise[Source[Int, String]]() - Source.fromFutureSource(sourcePromise.future).runWith(Sink.asPublisher(true)).subscribe(c) - val sub = c.expectSubscription() - import scala.concurrent.duration._ - c.expectNoMessage(100.millis) - sub.request(3) - c.expectNoMessage(100.millis) - sourcePromise.success(underlying) - c.expectNext(1) - c.expectNext(2) - c.expectNext(3) - c.expectComplete() - } - - "emit the elements of the future source" in assertAllStagesStopped { - - val sourcePromise = Promise[Source[Int, String]]() - val (sourceMatVal, sinkMatVal) = - Source.fromFutureSource(sourcePromise.future).toMat(Sink.seq)(Keep.both).run() - sourcePromise.success(underlying) - // should complete as soon as inner source has been materialized - sourceMatVal.futureValue should ===("foo") - sinkMatVal.futureValue should ===(List(1, 2, 3)) - } - - "emit the elements from a source in a completion stage" in assertAllStagesStopped { - val (sourceMatVal, sinkMatVal) = - Source - .fromSourceCompletionStage( - // can't be inferred - CompletableFuture.completedFuture[Graph[SourceShape[Int], String]](underlying)) - .toMat(Sink.seq)(Keep.both) - .run() - - sourceMatVal.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS) should ===("foo") - sinkMatVal.futureValue should ===(List(1, 2, 3)) - } - - "handle downstream cancelling before the underlying Future completes" in assertAllStagesStopped { - val sourcePromise = Promise[Source[Int, String]]() - - val probe = TestSubscriber.probe[Int]() - val sourceMatVal = - Source.fromFutureSource(sourcePromise.future).toMat(Sink.fromSubscriber(probe))(Keep.left).run() - - // wait for cancellation to occur - probe.ensureSubscription() - probe.request(1) - probe.cancel() - - // try to avoid a race between probe cancel and completing the promise - Thread.sleep(100) - - // even though canceled the underlying matval should arrive - sourcePromise.success(underlying) - val failure = sourceMatVal.failed.futureValue - failure shouldBe a[StreamDetachedException] - failure.getMessage should ===("Stream cancelled before Source Future completed") - } - - "fail if the underlying Future is failed" in assertAllStagesStopped { - val failure = TE("foo") - val underlying = Future.failed[Source[Int, String]](failure) - val (sourceMatVal, sinkMatVal) = Source.fromFutureSource(underlying).toMat(Sink.seq)(Keep.both).run() - sourceMatVal.failed.futureValue should ===(failure) - sinkMatVal.failed.futureValue should ===(failure) - } - - "fail as the underlying Future fails after outer source materialization" in assertAllStagesStopped { - val failure = TE("foo") - val sourcePromise = Promise[Source[Int, String]]() - val materializationLatch = TestLatch(1) - val (sourceMatVal, sinkMatVal) = - Source - .fromFutureSource(sourcePromise.future) - .mapMaterializedValue { value => - materializationLatch.countDown() - value - } - .toMat(Sink.seq)(Keep.both) - .run() - - // we don't know that materialization completed yet (this is still a bit racy) - Await.ready(materializationLatch, remainingOrDefault) - Thread.sleep(100) - sourcePromise.failure(failure) - - sourceMatVal.failed.futureValue should ===(failure) - sinkMatVal.failed.futureValue should ===(failure) - } - - "fail as the underlying Future fails after outer source materialization with no demand" in assertAllStagesStopped { - val failure = TE("foo") - val sourcePromise = Promise[Source[Int, String]]() - val testProbe = TestSubscriber.probe[Int]() - val sourceMatVal = - Source.fromFutureSource(sourcePromise.future).to(Sink.fromSubscriber(testProbe)).run() - - testProbe.expectSubscription() - sourcePromise.failure(failure) - - sourceMatVal.failed.futureValue should ===(failure) - } - - "handle back-pressure when the future completes" in assertAllStagesStopped { - val subscriber = TestSubscriber.probe[Int]() - val publisher = TestPublisher.probe[Int]() - - val sourcePromise = Promise[Source[Int, String]]() - - val matVal = Source.fromFutureSource(sourcePromise.future).to(Sink.fromSubscriber(subscriber)).run() - - subscriber.ensureSubscription() - - sourcePromise.success(Source.fromPublisher(publisher).mapMaterializedValue(_ => "woho")) - - // materialized value completes but still no demand - matVal.futureValue should ===("woho") - - // then demand and let an element through to see it works - subscriber.ensureSubscription() - subscriber.request(1) - publisher.expectRequest() - publisher.sendNext(1) - subscriber.expectNext(1) - publisher.sendComplete() - subscriber.expectComplete() - } - - "carry through cancellation to later materialized source" in assertAllStagesStopped { - val subscriber = TestSubscriber.probe[Int]() - val publisher = TestPublisher.probe[Int]() - - val sourcePromise = Promise[Source[Int, String]]() - - val matVal = Source.fromFutureSource(sourcePromise.future).to(Sink.fromSubscriber(subscriber)).run() - - subscriber.ensureSubscription() - - sourcePromise.success(Source.fromPublisher(publisher).mapMaterializedValue(_ => "woho")) - - // materialized value completes but still no demand - matVal.futureValue should ===("woho") - - // cancelling the outer source should carry through to the internal one - subscriber.ensureSubscription() - subscriber.cancel() - publisher.expectCancellation() - } - - class FailingMatGraphStage extends GraphStageWithMaterializedValue[SourceShape[Int], String] { - val out = Outlet[Int]("whatever") - override val shape: SourceShape[Int] = SourceShape(out) - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, String) = { - throw TE("INNER_FAILED") - } - - } - - "fail when the future source materialization fails" in assertAllStagesStopped { - val inner = Future.successful(Source.fromGraph(new FailingMatGraphStage)) - val (innerSourceMat: Future[String], outerSinkMat: Future[Seq[Int]]) = - Source.fromFutureSource(inner).toMat(Sink.seq)(Keep.both).run() - - outerSinkMat.failed.futureValue should ===(TE("INNER_FAILED")) - innerSourceMat.failed.futureValue should ===(TE("INNER_FAILED")) - } - } -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index 95d16b3b31..7c28a03f0b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -137,7 +137,7 @@ class GraphConcatSpec extends TwoStreamsSetup { .fromGraph(GraphDSL.create() { implicit b => val concat = b.add(Concat[Int]()) Source(List(1, 2, 3)) ~> concat.in(0) - Source.fromFuture(promise.future) ~> concat.in(1) + Source.future(promise.future) ~> concat.in(1) concat.out ~> Sink.fromSubscriber(subscriber) ClosedShape }) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala index b98269b341..2ecb2ce634 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazilyAsyncSpec.scala @@ -11,10 +11,12 @@ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.StreamSpec import akka.stream.testkit.TestSubscriber import akka.testkit.DefaultTimeout +import com.github.ghik.silencer.silent import org.scalatest.concurrent.ScalaFutures import scala.concurrent.Future +@silent("deprecated") // tests deprecated methods class LazilyAsyncSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import system.dispatcher diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala index aa349bae1d..2769bb3f4a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazyFlowSpec.scala @@ -5,18 +5,24 @@ package akka.stream.scaladsl import akka.NotUsed +import akka.stream.AbruptStageTerminationException +import akka.stream.Materializer +import akka.stream.NeverMaterializedException +import akka.stream.testkit.StreamSpec +import akka.stream.testkit.TestPublisher import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSource -import akka.stream.testkit.StreamSpec -import akka.stream.testkit.TestPublisher -import org.scalatest.concurrent.ScalaFutures +import akka.testkit.TestProbe +import com.github.ghik.silencer.silent -import scala.concurrent.duration._ +import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.Promise +import scala.concurrent.duration._ +@silent("deprecated") // tests deprecated API as well class LazyFlowSpec extends StreamSpec(""" akka.stream.materializer.initial-input-buffer-size = 1 akka.stream.materializer.max-input-buffer-size = 1 @@ -24,7 +30,184 @@ class LazyFlowSpec extends StreamSpec(""" val ex = TE("") - "A LazyFlow" must { + "Flow.lazyFlow" must { + // more complete test coverage is for lazyFutureFlow since this is composition of that + "work in the happy case" in assertAllStagesStopped { + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source(List(1, 2, 3)) + .viaMat(Flow.lazyFlow(() => Flow.fromFunction((n: Int) => n.toString)))(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + list.futureValue should equal(Seq("1", "2", "3")) + deferredMatVal.isCompleted should ===(true) + } + + } + + "Flow.futureFlow" must { + // more complete test coverage is for lazyFutureFlow since this is composition of that + "work in the happy case" in assertAllStagesStopped { + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source(List(1, 2, 3)) + .viaMat(Flow.futureFlow(Future.successful(Flow.fromFunction((n: Int) => n.toString))))(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + list.futureValue should equal(Seq("1", "2", "3")) + deferredMatVal.isCompleted should ===(true) + } + } + + "Flow.lazyFutureFlow" must { + + "work in the happy case" in assertAllStagesStopped { + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source(List(1, 2, 3)) + .viaMat(Flow.lazyFutureFlow(() => Future.successful(Flow.fromFunction((n: Int) => n.toString))))(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + list.futureValue should equal(Seq("1", "2", "3")) + deferredMatVal.isCompleted should ===(true) + } + + "complete without creating internal flow when there was no elements in the stream" in assertAllStagesStopped { + val probe = TestProbe() + val result: (Future[NotUsed], Future[immutable.Seq[Int]]) = Source + .empty[Int] + .viaMat(Flow.lazyFutureFlow { () => + probe.ref ! "constructed" + Future.successful(Flow[Int]) + })(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + list.futureValue should equal(Seq.empty) + // and failing the matval + deferredMatVal.failed.futureValue shouldBe a[NeverMaterializedException] + probe.expectNoMessage(30.millis) // would have gotten it by now + } + + "complete without creating internal flow when the stream failed with no elements" in assertAllStagesStopped { + val probe = TestProbe() + val result: (Future[NotUsed], Future[immutable.Seq[Int]]) = Source + .failed[Int](TE("no-elements")) + .viaMat(Flow.lazyFutureFlow { () => + probe.ref ! "constructed" + Future.successful(Flow[Int]) + })(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + list.failed.futureValue shouldBe a[TE] + // and failing the matval + deferredMatVal.failed.futureValue shouldBe a[NeverMaterializedException] + probe.expectNoMessage(30.millis) // would have gotten it by now + } + + "fail the flow when the factory function fails" in assertAllStagesStopped { + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source(List(1, 2, 3)) + .viaMat(Flow.lazyFutureFlow(() => throw TE("no-flow-for-you")))(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + list.failed.futureValue shouldBe a[TE] + deferredMatVal.failed.futureValue shouldBe a[TE] + } + + "fail the flow when the future is initially failed" in assertAllStagesStopped { + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source(List(1, 2, 3)) + .viaMat(Flow.lazyFutureFlow(() => Future.failed(TE("no-flow-for-you"))))(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + list.failed.futureValue shouldBe a[TE] + deferredMatVal.failed.futureValue shouldBe a[TE] + } + + "fail the flow when the future is failed after the fact" in assertAllStagesStopped { + val promise = Promise[Flow[Int, String, NotUsed]]() + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source(List(1, 2, 3)) + .viaMat(Flow.lazyFutureFlow(() => promise.future))(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + + promise.failure(TE("later-no-flow-for-you")) + list.failed.futureValue shouldBe a[TE] + deferredMatVal.failed.futureValue shouldBe a[TE] + } + + "fail the flow when the future materialization fails" in assertAllStagesStopped { + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source(List(1, 2, 3)) + .viaMat(Flow.lazyFutureFlow(() => + Future.successful(Flow[Int].map(_.toString).mapMaterializedValue(_ => throw TE("mat-failed")))))(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + list.failed.futureValue shouldBe a[TE] + deferredMatVal.failed.futureValue shouldBe a[TE] + } + + "fail the flow when there was elements but the inner flow failed" in assertAllStagesStopped { + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source(List(1, 2, 3)) + .viaMat(Flow.lazyFutureFlow(() => Future.successful(Flow[Int].map(_ => throw TE("inner-stream-fail")))))( + Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + val deferredMatVal = result._1 + val list = result._2 + + list.failed.futureValue shouldBe a[TE] + deferredMatVal.futureValue should ===(NotUsed) // inner materialization did succeed + } + + "fail the mat val when the stream is abruptly terminated before it got materialized" in assertAllStagesStopped { + val expendableMaterializer = Materializer(system) + val promise = Promise[Flow[Int, String, NotUsed]]() + val result: (Future[NotUsed], Future[immutable.Seq[String]]) = + Source + .maybe[Int] + .viaMat(Flow.lazyFutureFlow(() => promise.future))(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run()(expendableMaterializer) + + val deferredMatVal = result._1 + val list = result._2 + + expendableMaterializer.shutdown() + + list.failed.futureValue shouldBe an[AbruptStageTerminationException] + deferredMatVal.failed.futureValue shouldBe an[AbruptStageTerminationException] + } + } + + "The deprecated LazyFlow ops" must { def mapF(e: Int): () => Future[Flow[Int, String, NotUsed]] = () => Future.successful(Flow.fromFunction[Int, String](i => (i * e).toString)) val flowF = Future.successful(Flow[Int]) @@ -91,7 +274,7 @@ class LazyFlowSpec extends StreamSpec(""" sub.expectComplete() } - "fail gracefully when flow factory method failed" in assertAllStagesStopped { + "fail gracefully when flow factory function failed" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() val probe = Source .fromPublisher(sourceProbe) @@ -147,20 +330,6 @@ class LazyFlowSpec extends StreamSpec(""" probe.cancel() sourceSub.expectCancellation() } - - "fail correctly when factory throw error" in assertAllStagesStopped { - val msg = "fail!" - val matFail = TE(msg) - val result = Source - .single("whatever") - .viaMat(Flow.lazyInitAsync(() => throw matFail))(Keep.right) - .toMat(Sink.ignore)(Keep.left) - .run() - - ScalaFutures.whenReady(result.failed) { e => - e.getMessage shouldBe msg - } - } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala index 1b99c63236..44897b1368 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala @@ -16,6 +16,7 @@ import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink +import com.github.ghik.silencer.silent import scala.collection.immutable import scala.concurrent.Await @@ -23,6 +24,7 @@ import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ +@silent("deprecated") class LazySinkSpec extends StreamSpec(""" akka.stream.materializer.initial-input-buffer-size = 1 akka.stream.materializer.max-input-buffer-size = 1 diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala index ae9c30af10..f3494507b2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -6,8 +6,7 @@ package akka.stream.scaladsl import java.util.concurrent.atomic.AtomicBoolean -import akka.Done -import akka.stream.impl.LazySource +import akka.stream._ import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic import akka.stream.testkit.Utils.TE @@ -15,18 +14,112 @@ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.StreamSpec import akka.stream.testkit.TestPublisher import akka.stream.testkit.TestSubscriber -import akka.stream.{ Attributes, KillSwitches, Outlet, SourceShape } -import akka.testkit.{ DefaultTimeout, TestProbe } +import akka.testkit.DefaultTimeout +import akka.testkit.TestProbe +import akka.Done +import akka.NotUsed import org.scalatest.concurrent.ScalaFutures import scala.collection.immutable.Seq import scala.concurrent.Future +import scala.concurrent.Promise class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { - "A lazy source" should { + import system.dispatcher + + "Source.lazySingle" must { "work like a normal source, happy path" in assertAllStagesStopped { - val result = Source.fromGraph(LazySource(() => Source(List(1, 2, 3)))).runWith(Sink.seq) + val seq = Source.lazySingle(() => 1).runWith(Sink.seq) + seq.futureValue should ===(Seq(1)) + } + + "never construct the source when there was no demand" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + val constructed = new AtomicBoolean(false) + Source + .lazySingle { () => + constructed.set(true) + 1 + } + .toMat(Sink.fromSubscriber(probe))(Keep.left) + .run() + probe.cancel() + + constructed.get() should ===(false) + } + + "fail correctly when factory function fails" in assertAllStagesStopped { + val failure = TE("couldn't create") + val termination: Future[Done] = + Source.lazySingle(() => throw failure).watchTermination()(Keep.right).toMat(Sink.ignore)(Keep.left).run() + + termination.failed.futureValue should ===(failure) + } + + } + + "Source.lazyFuture" must { + "work like a normal source, happy path, already completed future" in assertAllStagesStopped { + val seq = Source.lazyFuture(() => Future.successful(1)).runWith(Sink.seq) + + seq.futureValue should ===(Seq(1)) + } + + "work like a normal source, happy path, completing future" in assertAllStagesStopped { + val promise = Promise[Int]() + val seq = Source.lazyFuture(() => promise.future).runWith(Sink.seq) + promise.success(1) + seq.futureValue should ===(Seq(1)) + } + + "never construct the source when there was no demand" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + val constructed = new AtomicBoolean(false) + Source + .lazySingle { () => + constructed.set(true) + 1 + } + .runWith(Sink.fromSubscriber(probe)) + probe.cancel() + + constructed.get() should ===(false) + } + + "fail correctly when factory function fails" in assertAllStagesStopped { + val failure = TE("couldn't create") + val termination = + Source.lazyFuture(() => throw failure).watchTermination()(Keep.right).toMat(Sink.ignore)(Keep.left).run() + + termination.failed.futureValue should ===(failure) + } + + "fail correctly when factory function returns a failed future" in assertAllStagesStopped { + val failure = TE("couldn't create") + val termination = + Source + .lazyFuture(() => Future.failed(failure)) + .watchTermination()(Keep.right) + .toMat(Sink.ignore)(Keep.left) + .run() + + termination.failed.futureValue should ===(failure) + } + + "fail correctly when factory function returns a future that fails" in assertAllStagesStopped { + val failure = TE("couldn't create") + val promise = Promise[Int]() + val termination = + Source.lazyFuture(() => promise.future).watchTermination()(Keep.right).toMat(Sink.ignore)(Keep.left).run() + promise.failure(failure) + termination.failed.futureValue should ===(failure) + } + } + + "Source.lazySource" must { + "work like a normal source, happy path" in assertAllStagesStopped { + val result = Source.lazySource(() => Source(List(1, 2, 3))).runWith(Sink.seq) result.futureValue should ===(Seq(1, 2, 3)) } @@ -34,29 +127,29 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "never construct the source when there was no demand" in assertAllStagesStopped { val probe = TestSubscriber.probe[Int]() val constructed = new AtomicBoolean(false) - Source - .fromGraph(LazySource { () => + val result = Source + .lazySource { () => constructed.set(true); Source(List(1, 2, 3)) - }) - .runWith(Sink.fromSubscriber(probe)) + } + .toMat(Sink.fromSubscriber(probe))(Keep.left) + .run() probe.cancel() constructed.get() should ===(false) + result.isCompleted should ===(false) } "fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped { - val matF = Source.fromGraph(LazySource(() => Source(List(1, 2, 3)))).toMat(Sink.cancelled)(Keep.left).run() + val lazyMatVal = Source.lazySource(() => Source(List(1, 2, 3))).toMat(Sink.cancelled)(Keep.left).run() - intercept[RuntimeException] { - matF.futureValue - } + lazyMatVal.failed.futureValue shouldBe a[NeverMaterializedException] } "stop consuming when downstream has cancelled" in assertAllStagesStopped { val outProbe = TestSubscriber.probe[Int]() val inProbe = TestPublisher.probe[Int]() - Source.fromGraph(LazySource(() => Source.fromPublisher(inProbe))).runWith(Sink.fromSubscriber(outProbe)) + Source.lazySource(() => Source.fromPublisher(inProbe)).runWith(Sink.fromSubscriber(outProbe)) outProbe.request(1) inProbe.expectRequest() @@ -70,9 +163,9 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { val probe = TestSubscriber.probe[Int]() val matF: Future[Done] = Source - .fromGraph(LazySource { () => + .lazySource { () => Source(List(1, 2, 3)).mapMaterializedValue(_ => Done) - }) + } .to(Sink.fromSubscriber(probe)) .run() @@ -84,11 +177,60 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { probe.cancel() } + "fail stage when upstream fails" in assertAllStagesStopped { + val outProbe = TestSubscriber.probe[Int]() + val inProbe = TestPublisher.probe[Int]() + + val lazyMatVal = + Source.lazySource(() => Source.fromPublisher(inProbe)).toMat(Sink.fromSubscriber(outProbe))(Keep.left).run() + + outProbe.request(1) + inProbe.expectRequest() + + lazyMatVal.futureValue should ===(NotUsed) // was completed + + inProbe.sendNext(27) + outProbe.expectNext(27) + val failure = TE("OMG Who set that on fire!?!") + inProbe.sendError(failure) + outProbe.expectError() should ===(failure) + } + + "fail when lazy source is failed" in assertAllStagesStopped { + val failure = TE("OMG Who set that on fire!?!") + val result = Source.lazySource(() => Source.failed(failure)).runWith(Sink.seq) + result.failed.futureValue should ===(failure) + } + + "fail correctly when factory function fails" in assertAllStagesStopped { + val failure = TE("couldn't create") + val lazyMatVal = Source.lazySource(() => throw failure).toMat(Sink.ignore)(Keep.left).run() + + lazyMatVal.failed.futureValue should ===(failure) + } + + "fail correctly when materialization of inner source fails" in assertAllStagesStopped { + val matFail = TE("fail!") + object FailingInnerMat extends GraphStage[SourceShape[String]] { + val out = Outlet[String]("out") + val shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + throw matFail + } + } + + val (lazyMatVal, done) = + Source.lazySource(() => Source.fromGraph(FailingInnerMat)).toMat(Sink.ignore)(Keep.both).run() + + done.failed.futureValue should ===(matFail) + lazyMatVal.failed.futureValue should ===(matFail) + } + "propagate downstream cancellation cause when inner source has been materialized" in { val probe = TestProbe() val (doneF, killswitch) = Source - .lazily(() => + .lazySource(() => Source.maybe[Int].watchTermination()(Keep.right).mapMaterializedValue { done => probe.ref ! Done done @@ -102,19 +244,126 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { killswitch.abort(boom) doneF.failed.futureValue should ===(boom) } + } - "fail stage when upstream fails" in assertAllStagesStopped { + "Source.lazyFutureSource" must { + "work like a normal source, happy path" in assertAllStagesStopped { + val result = Source.lazyFutureSource(() => Future { Source(List(1, 2, 3)) }).runWith(Sink.seq) + + result.futureValue should ===(Seq(1, 2, 3)) + } + + "work like a normal source, happy path, already completed future" in assertAllStagesStopped { + val result = Source.lazyFutureSource(() => Future.successful { Source(List(1, 2, 3)) }).runWith(Sink.seq) + + result.futureValue should ===(Seq(1, 2, 3)) + } + + "never construct the source when there was no demand" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + val constructed = new AtomicBoolean(false) + val result = Source + .lazyFutureSource { () => + Future { + constructed.set(true) + Source(List(1, 2, 3)) + }; + } + .toMat(Sink.fromSubscriber(probe))(Keep.left) + .run() + probe.cancel() + + constructed.get() should ===(false) + result.isCompleted should ===(false) + } + + "fail the materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped { + val lazyMatVal: Future[NotUsed] = + Source.lazyFutureSource(() => Future { Source(List(1, 2, 3)) }).toMat(Sink.cancelled)(Keep.left).run() + + lazyMatVal.failed.futureValue shouldBe a[NeverMaterializedException] + } + + "stop consuming when downstream has cancelled" in assertAllStagesStopped { val outProbe = TestSubscriber.probe[Int]() val inProbe = TestPublisher.probe[Int]() - Source.fromGraph(LazySource(() => Source.fromPublisher(inProbe))).runWith(Sink.fromSubscriber(outProbe)) + Source.lazyFutureSource(() => Future { Source.fromPublisher(inProbe) }).runWith(Sink.fromSubscriber(outProbe)) outProbe.request(1) inProbe.expectRequest() inProbe.sendNext(27) outProbe.expectNext(27) - inProbe.sendError(TE("OMG Who set that on fire!?!")) - outProbe.expectError() shouldEqual TE("OMG Who set that on fire!?!") + outProbe.cancel() + inProbe.expectCancellation() + } + + "materialize when the source has been created" in assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + + val matF: Future[Done] = Source + .lazyFutureSource { () => + Future { + Source(List(1, 2, 3)).mapMaterializedValue(_ => Done) + } + } + .to(Sink.fromSubscriber(probe)) + .run() + + matF.value shouldEqual None + probe.request(1) + probe.expectNext(1) + matF.futureValue should ===(Done) + + probe.cancel() + } + + "fail stage when upstream fails" in assertAllStagesStopped { + val outProbe = TestSubscriber.probe[Int]() + val inProbe = TestPublisher.probe[Int]() + + val lazyMatVal: Future[NotUsed] = + Source + .lazyFutureSource(() => + Future { + Source.fromPublisher(inProbe) + }) + .toMat(Sink.fromSubscriber(outProbe))(Keep.left) + .run() + + outProbe.request(1) + lazyMatVal.futureValue should ===(NotUsed) // but completed + inProbe.expectRequest() + inProbe.sendNext(27) + outProbe.expectNext(27) + val failure = TE("OMG Who set that on fire!?!") + inProbe.sendError(failure) + outProbe.expectError() should ===(failure) + } + + "fail correctly when factory function fails" in assertAllStagesStopped { + val failure = TE("couldn't create") + val lazyMatVal: Future[NotUsed] = + Source.lazyFutureSource[Int, NotUsed](() => throw failure).toMat(Sink.ignore)(Keep.left).run() + + lazyMatVal.failed.futureValue should ===(failure) + } + + "fail correctly when factory function returns a failed future" in assertAllStagesStopped { + val failure = TE("couldn't create") + val lazyMatVal: Future[NotUsed] = + Source.lazyFutureSource[Int, NotUsed](() => Future.failed(failure)).toMat(Sink.ignore)(Keep.left).run() + + lazyMatVal.failed.futureValue should ===(failure) + } + + "fail correctly when factory function returns a future that fails" in assertAllStagesStopped { + val failure = TE("couldn't create") + val promise = Promise[Source[Int, NotUsed]]() + val lazyMatVal: Future[NotUsed] = + Source.lazyFutureSource(() => promise.future).toMat(Sink.ignore)(Keep.left).run() + promise.failure(failure) + lazyMatVal.failed.futureValue should ===(failure) } "fail correctly when materialization of inner source fails" in assertAllStagesStopped { @@ -127,10 +376,38 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } - val result = Source.lazily(() => Source.fromGraph(FailingInnerMat)).to(Sink.ignore).run() + val (lazyMatVal, done) = + Source + .lazyFutureSource(() => + Future { + Source.fromGraph(FailingInnerMat) + }) + .toMat(Sink.ignore)(Keep.both) + .run() - result.failed.futureValue should ===(matFail) + done.failed.futureValue should ===(matFail) + lazyMatVal.failed.futureValue should ===(matFail) + } + "propagate downstream cancellation cause when inner source has been materialized" in { + val probe = TestProbe() + val (terminationF, killswitch) = + Source + .lazyFutureSource(() => + Future { + Source.maybe[Int].watchTermination()(Keep.right).mapMaterializedValue { done => + probe.ref ! Done + done + } + }) + .mapMaterializedValue(_.flatten) + .viaMat(KillSwitches.single)(Keep.both) + .to(Sink.ignore) + .run() + val boom = TE("boom") + probe.expectMsg(Done) + killswitch.abort(boom) + terminationF.failed.futureValue should ===(boom) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala index 61d66d7128..c5859ffe5a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala @@ -41,7 +41,7 @@ class MaterializerStateSpec extends StreamSpec { "snapshot a running stream on the default dispatcher" in { val promise = Promise[Int]() - Source.fromFuture(promise.future).map(_.toString).zipWithIndex.runWith(Sink.seq) + Source.future(promise.future).map(_.toString).zipWithIndex.runWith(Sink.seq) awaitAssert({ val snapshot = MaterializerState.streamSnapshots(system).futureValue diff --git a/akka-stream/src/main/scala/akka/stream/NeverMaterializedException.scala b/akka-stream/src/main/scala/akka/stream/NeverMaterializedException.scala new file mode 100644 index 0000000000..6b608182a8 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/NeverMaterializedException.scala @@ -0,0 +1,12 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.stream + +final class NeverMaterializedException(cause: Throwable) + extends RuntimeException("Downstream canceled without triggering lazy source materialization", cause) { + + def this() = this(null) + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala index e30fc91225..9daafed61a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala @@ -35,13 +35,18 @@ import scala.util.control.NonFatal val logic = new GraphStageLogic(shape) with OutHandler { override def onDownstreamFinish(cause: Throwable): Unit = { - matPromise.failure( - new RuntimeException("Downstream canceled without triggering lazy source materialization", cause)) + matPromise.failure(new NeverMaterializedException(cause)) completeStage() } override def onPull(): Unit = { - val source = sourceFactory() + val source = try { + sourceFactory() + } catch { + case NonFatal(ex) => + matPromise.tryFailure(ex) + throw ex + } val subSink = new SubSinkInlet[T]("LazySource") subSink.pull() @@ -76,7 +81,7 @@ import scala.util.control.NonFatal setHandler(out, this) override def postStop() = { - matPromise.tryFailure(new RuntimeException("LazySource stopped without completing the materialized future")) + if (!matPromise.isCompleted) matPromise.tryFailure(new AbruptStageTerminationException(this)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 56a67a54e6..ec96318720 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -531,16 +531,16 @@ import scala.util.control.NonFatal * INTERNAL API */ @InternalApi final private[stream] class LazySink[T, M](sinkFactory: T => Future[Sink[T, M]]) - extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[M]]] { + extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { val in = Inlet[T]("lazySink.in") override def initialAttributes = DefaultAttributes.lazySink override val shape: SinkShape[T] = SinkShape.of(in) override def toString: String = "LazySink" - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val promise = Promise[Option[M]]() + val promise = Promise[M]() val stageLogic = new GraphStageLogic(shape) with InHandler { var switching = false override def preStart(): Unit = pull(in) @@ -556,7 +556,7 @@ import scala.util.control.NonFatal if (!promise.isCompleted) { try { val mat = switchTo(sink, element) - promise.success(Some(mat)) + promise.success(mat) setKeepGoing(true) } catch { case NonFatal(e) => @@ -584,7 +584,7 @@ import scala.util.control.NonFatal // there is a cached element -> the stage must not be shut down automatically because isClosed(in) is satisfied setKeepGoing(true) } else { - promise.success(None) + promise.failure(new NeverMaterializedException) super.onUpstreamFinish() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 4cec235074..64168294d2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -335,6 +335,11 @@ import scala.concurrent.{ Future, Promise } override def onUpstreamFinish(): Unit = completeStage() + override def onDownstreamFinish(cause: Throwable): Unit = { + sinkIn.cancel(cause) + super.onDownstreamFinish(cause) + } + override def postStop(): Unit = if (!sinkIn.isClosed) sinkIn.cancel() @@ -376,11 +381,13 @@ import scala.concurrent.{ Future, Promise } override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with OutHandler { def onPull(): Unit = { - if (future.isCompleted) { - onFutureCompleted(future.value.get) - } else { - val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _ - future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + future.value match { + case Some(completed) => + // optimization if the future is already completed + onFutureCompleted(completed) + case None => + val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _ + future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) } def onFutureCompleted(result: Try[T]): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index ec628c9509..897c87135f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -2080,10 +2080,13 @@ private[stream] object Collect { /** * INTERNAL API */ -@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I => Future[Flow[I, O, M]]) - extends GraphStageWithMaterializedValue[FlowShape[I, O], Future[Option[M]]] { - val in = Inlet[I]("lazyFlow.in") - val out = Outlet[O]("lazyFlow.out") +@InternalApi private[akka] final class LazyFlow[I, O, M](flowFactory: I => Future[Flow[I, O, M]]) + extends GraphStageWithMaterializedValue[FlowShape[I, O], Future[M]] { + + // FIXME: when removing the deprecated I => Flow factories we can remove that complication from this stage + + val in = Inlet[I]("LazyFlow.in") + val out = Outlet[O]("LazyFlow.out") override def initialAttributes = DefaultAttributes.lazyFlow @@ -2091,78 +2094,84 @@ private[stream] object Collect { override def toString: String = "LazyFlow" - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - - val matPromise = Promise[Option[M]]() + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { + val matPromise = Promise[M]() val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - var switching = false // // implementation of handler methods in initial state // - - override def onPush(): Unit = { - val element = grab(in) - switching = true - val cb = getAsyncCallback[Try[Flow[I, O, M]]] { - case Success(flow) => - // check if the stage is still in need for the lazy flow - // (there could have been an onUpstreamFailure or onDownstreamFinish in the meantime that has completed the promise) - if (!matPromise.isCompleted) { - try { - val mat = switchTo(flow, element) - matPromise.success(Some(mat)) - } catch { - case NonFatal(e) => - matPromise.failure(e) - failStage(e) - } + private def onFlowFutureComplete(firstElement: I)(result: Try[Flow[I, O, M]]) = result match { + case Success(flow) => + // check if the stage is still in need for the lazy flow + // (there could have been an onUpstreamFailure or onDownstreamFinish in the meantime that has completed the promise) + if (!matPromise.isCompleted) { + try { + val mat = switchTo(flow, firstElement) + matPromise.success(mat) + } catch { + case NonFatal(e) => + matPromise.failure(e) + failStage(e) } - case Failure(e) => - matPromise.failure(e) - failStage(e) - } + } + case Failure(e) => + matPromise.failure(e) + failStage(e) + } + + override def onPush(): Unit = try { - flowFactory(element).onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext) + val element = grab(in) + switching = true + val futureFlow = flowFactory(element) + + // optimization avoid extra scheduling if already completed + futureFlow.value match { + case Some(completed) => + onFlowFutureComplete(element)(completed) + case None => + val cb = getAsyncCallback[Try[Flow[I, O, M]]](onFlowFutureComplete(element)) + futureFlow.onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext) + } } catch { case NonFatal(e) => matPromise.failure(e) failStage(e) } - } override def onUpstreamFinish(): Unit = { + if (!matPromise.isCompleted) + matPromise.tryFailure(new NeverMaterializedException) // ignore onUpstreamFinish while the stage is switching but setKeepGoing if (switching) { setKeepGoing(true) } else { - matPromise.success(None) super.onUpstreamFinish() } } override def onUpstreamFailure(ex: Throwable): Unit = { - matPromise.failure(ex) super.onUpstreamFailure(ex) - } - - override def onDownstreamFinish(cause: Throwable): Unit = { - matPromise.success(None) - super.onDownstreamFinish(cause) + if (!matPromise.isCompleted) + matPromise.tryFailure(new NeverMaterializedException(ex)) } override def onPull(): Unit = { pull(in) } + override def postStop(): Unit = { + if (!matPromise.isCompleted) + matPromise.tryFailure(new AbruptStageTerminationException(this)) + } + setHandler(in, this) setHandler(out, this) private def switchTo(flow: Flow[I, O, M], firstElement: I): M = { - var firstElementPushed = false - // // ports are wired in the following way: // @@ -2174,6 +2183,7 @@ private[stream] object Collect { val matVal = Source .fromGraph(subOutlet.source) + .prepend(Source.single(firstElement)) .viaMat(flow)(Keep.right) .toMat(subInlet.sink)(Keep.left) .run()(interpreter.subFusingMaterializer) @@ -2203,10 +2213,8 @@ private[stream] object Collect { subOutlet.push(grab(in)) } override def onUpstreamFinish(): Unit = { - if (firstElementPushed) { - subOutlet.complete() - maybeCompleteStage() - } + subOutlet.complete() + maybeCompleteStage() } override def onUpstreamFailure(ex: Throwable): Unit = { // propagate exception irrespective if the cached element has been pushed or not @@ -2227,19 +2235,7 @@ private[stream] object Collect { subOutlet.setHandler(new OutHandler { override def onPull(): Unit = { - if (firstElementPushed) { - pull(in) - } else { - // the demand can be satisfied right away by the cached element - firstElementPushed = true - subOutlet.push(firstElement) - // in.onUpstreamFinished was not propagated if it arrived before the cached element was pushed - // -> check if the completion must be propagated now - if (isClosed(in)) { - subOutlet.complete() - maybeCompleteStage() - } - } + pull(in) } override def onDownstreamFinish(cause: Throwable): Unit = { if (!isClosed(in)) { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index ee3e3685c5..3c80776150 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -9,6 +9,7 @@ import java.util.function.BiFunction import java.util.function.Supplier import java.util.Comparator import java.util.Optional +import java.util.concurrent.CompletableFuture import akka.actor.ActorRef import akka.actor.ClassicActorSystemProvider @@ -25,6 +26,7 @@ import akka.util.ConstantFun import akka.util.Timeout import akka.Done import akka.NotUsed +import akka.japi.function.Creator import com.github.ghik.silencer.silent import org.reactivestreams.Processor @@ -253,10 +255,9 @@ object Flow { * * '''Cancels when''' downstream cancels */ - @Deprecated @deprecated( - "Use lazyInitAsync instead. (lazyInitAsync returns a flow with a more useful materialized value.)", - "2.5.12") + "Use 'Flow.completionStageFlow' in combination with prefixAndTail(1) instead, see `completionStageFlow` operator docs for details", + "2.6.0") def lazyInit[I, O, M]( flowFactory: function.Function[I, CompletionStage[Flow[I, O, M]]], fallback: function.Creator[M]): Flow[I, O, M] = { @@ -284,6 +285,7 @@ object Flow { * * '''Cancels when''' downstream cancels */ + @deprecated("Use 'Flow.lazyCompletionStageFlow' instead", "2.6.0") def lazyInitAsync[I, O, M]( flowFactory: function.Creator[CompletionStage[Flow[I, O, M]]]): Flow[I, O, CompletionStage[Optional[M]]] = { import scala.compat.java8.FutureConverters._ @@ -299,6 +301,63 @@ object Flow { new Flow(sflow) } + /** + * Turn a `CompletionStage` into a flow that will consume the values of the source when the future completes successfully. + * If the `Future` is completed with a failure the stream is failed. + * + * The materialized completion stage value is completed with the materialized value of the future flow or failed with a + * [[NeverMaterializedException]] if upstream fails or downstream cancels before the completion stage has completed. + */ + def completionStageFlow[I, O, M](flow: CompletionStage[Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = + lazyCompletionStageFlow(() => flow) + + /** + * Defers invoking the `create` function to create a future flow until there is downstream demand and passing + * that downstream demand upstream triggers the first element. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and can trigger the factory earlier than expected. + * + * '''Emits when''' the internal flow is successfully created and it emits + * + * '''Backpressures when''' the internal flow is successfully created and it backpressures or downstream backpressures + * + * '''Completes when''' upstream completes and all elements have been emitted from the internal flow + * + * '''Cancels when''' downstream cancels + */ + def lazyFlow[I, O, M](create: Creator[Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = + lazyCompletionStageFlow(() => CompletableFuture.completedFuture(create.create())) + + /** + * Defers invoking the `create` function to create a future flow until there downstream demand has caused upstream + * to send a first element. + * + * The materialized future value is completed with the materialized value of the created flow when that has successfully + * been materialized. + * + * If the `create` function throws or returns a future that fails the stream is failed, in this case the materialized + * future value is failed with a [[NeverMaterializedException]]. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and can trigger the factory earlier than expected. + * + * '''Emits when''' the internal flow is successfully created and it emits + * + * '''Backpressures when''' the internal flow is successfully created and it backpressures or downstream backpressures + * + * '''Completes when''' upstream completes and all elements have been emitted from the internal flow + * + * '''Cancels when''' downstream cancels + */ + def lazyCompletionStageFlow[I, O, M]( + create: Creator[CompletionStage[Flow[I, O, M]]]): Flow[I, O, CompletionStage[M]] = + scaladsl.Flow + .lazyFutureFlow[I, O, M](() => + create.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) + .mapMaterializedValue(_.toJava) + .asJava + /** * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with * fan-in operators where you do not want to pay the cost of casting each element in a `map`. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 25e01e1142..1aef0477f6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -5,6 +5,7 @@ package akka.stream.javadsl import java.util.Optional +import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage import java.util.function.BiFunction @@ -12,6 +13,7 @@ import akka.actor.{ ActorRef, ClassicActorSystemProvider, Status } import akka.dispatch.ExecutionContexts import akka._ import akka.japi.function +import akka.japi.function.Creator import akka.stream.impl.LinearTraversalBuilder import akka.stream.{ javadsl, scaladsl, _ } import org.reactivestreams.{ Publisher, Subscriber } @@ -360,10 +362,7 @@ object Sink { * sink fails then the `Future` is completed with the exception. * Otherwise the `Future` is completed with the materialized value of the internal sink. */ - @Deprecated - @deprecated( - "Use lazyInitAsync instead. (lazyInitAsync no more needs a fallback function and the materialized value more clearly indicates if the internal sink was materialized or not.)", - "2.5.11") + @deprecated("Use 'Sink.lazyCompletionStageSink' in combination with 'Flow.prefixAndTail(1)' instead", "2.6.0") def lazyInit[T, M]( sinkFactory: function.Function[T, CompletionStage[Sink[T, M]]], fallback: function.Creator[M]): Sink[T, CompletionStage[M]] = @@ -383,6 +382,7 @@ object Sink { * sink fails then the `Future` is completed with the exception. * Otherwise the `Future` is completed with the materialized value of the internal sink. */ + @deprecated("Use 'Sink.lazyCompletionStageSink' instead", "2.6.0") def lazyInitAsync[T, M]( sinkFactory: function.Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[Optional[M]]] = { val sSink = scaladsl.Sink @@ -395,6 +395,42 @@ object Sink { .toJava) new Sink(sSink) } + + /** + * Turn a `Future[Sink]` into a Sink that will consume the values of the source when the future completes successfully. + * If the `Future` is completed with a failure the stream is failed. + * + * The materialized future value is completed with the materialized value of the future sink or failed with a + * [[NeverMaterializedException]] if upstream fails or downstream cancels before the future has completed. + */ + def completionStageSink[T, M](future: CompletionStage[Sink[T, M]]): Sink[T, CompletionStage[M]] = + lazyCompletionStageSink[T, M](() => future) + + /** + * Defers invoking the `create` function to create a sink until there is a first element passed from upstream. + * + * The materialized future value is completed with the materialized value of the created sink when that has successfully + * been materialized. + * + * If the `create` function throws or returns or the stream fails to materialize, in this + * case the materialized future value is failed with a [[akka.stream.NeverMaterializedException]]. + */ + def lazySink[T, M](create: Creator[Sink[T, M]]): Sink[T, CompletionStage[M]] = + lazyCompletionStageSink(() => CompletableFuture.completedFuture(create.create)) + + /** + * Defers invoking the `create` function to create a future sink until there is a first element passed from upstream. + * + * The materialized future value is completed with the materialized value of the created sink when that has successfully + * been materialized. + * + * If the `create` function throws or returns a future that is failed, or the stream fails to materialize, in this + * case the materialized future value is failed with a [[akka.stream.NeverMaterializedException]]. + */ + def lazyCompletionStageSink[T, M](create: Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[M]] = + new Sink(scaladsl.Sink.lazyFutureSink { () => + create.create().toScala.map(_.asScala)((ExecutionContexts.sameThreadExecutionContext)) + }).mapMaterializedValue(_.toJava) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c50e98436f..1a6c243aa0 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -10,7 +10,9 @@ import java.util.concurrent.{ CompletableFuture, CompletionStage } import java.util.function.{ BiFunction, Supplier } import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } +import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter +import akka.japi.function.Creator import akka.japi.{ function, JavaPartialFunction, Pair, Util } import akka.stream._ import akka.stream.impl.LinearTraversalBuilder @@ -171,8 +173,9 @@ object Source { * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `Future` is completed with a failure. */ + @deprecated("Use 'Source.future' instead", "2.6.0") def fromFuture[O](future: Future[O]): javadsl.Source[O, NotUsed] = - new Source(scaladsl.Source.fromFuture(future)) + new Source(scaladsl.Source.future(future)) /** * Starts a new `Source` from the given `CompletionStage`. The stream will consist of @@ -180,14 +183,16 @@ object Source { * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `CompletionStage` is completed with a failure. */ + @deprecated("Use 'Source.completionStage' instead", "2.6.0") def fromCompletionStage[O](future: CompletionStage[O]): javadsl.Source[O, NotUsed] = - new Source(scaladsl.Source.fromCompletionStage(future)) + new Source(scaladsl.Source.completionStage(future)) /** * Streams the elements of the given future source once it successfully completes. * If the [[Future]] fails the stream is failed with the exception from the future. If downstream cancels before the * stream completes the materialized [[Future]] will be failed with a [[StreamDetachedException]]. */ + @deprecated("Use 'Source.futureSource' (potentially together with `Source.fromGraph`) instead", "2.6.0") def fromFutureSource[T, M](future: Future[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future)) @@ -197,9 +202,10 @@ object Source { * If downstream cancels before the stream completes the materialized [[CompletionStage]] will be failed * with a [[StreamDetachedException]] */ + @deprecated("Use 'Source.completionStageSource' (potentially together with `Source.fromGraph`) instead", "2.6.0") def fromSourceCompletionStage[T, M]( completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = - new Source(scaladsl.Source.fromSourceCompletionStage(completion)) + completionStageSource(completion.thenApply(fromGraph[T, M])) /** * Elements are emitted periodically with the specified interval. @@ -262,6 +268,7 @@ object Source { * the materialized future is completed with its value, if downstream cancels or fails without any demand the * `create` factory is never called and the materialized `CompletionStage` is failed. */ + @deprecated("Use 'Source.lazySource' instead", "2.6.0") def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] = scaladsl.Source.lazily[T, M](() => create.create().asScala).mapMaterializedValue(_.toJava).asJava @@ -272,9 +279,115 @@ object Source { * * @see [[Source.lazily]] */ + @deprecated("Use 'Source.lazyCompletionStage' instead", "2.6.0") def lazilyAsync[T](create: function.Creator[CompletionStage[T]]): Source[T, Future[NotUsed]] = scaladsl.Source.lazilyAsync[T](() => create.create().toScala).asJava + /** + * Emits a single value when the given Scala `Future` is successfully completed and then completes the stream. + * The stream fails if the `Future` is completed with a failure. + * + * Here for Java interoperability, the normal use from Java should be [[Source.completionStage]] + */ + def future[T](futureElement: Future[T]): Source[T, NotUsed] = + scaladsl.Source.future(futureElement).asJava + + /** + * Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream. + * If the `CompletionStage` is completed with a failure the stream is failed. + */ + def completionStage[T](completionStage: CompletionStage[T]): Source[T, NotUsed] = + future(completionStage.toScala) + + /** + * Turn a `CompletionStage[Source]` into a source that will emit the values of the source when the future completes successfully. + * If the `CompletionStage` is completed with a failure the stream is failed. + */ + def completionStageSource[T, M](completionStageSource: CompletionStage[Source[T, M]]): Source[T, CompletionStage[M]] = + scaladsl.Source + .futureSource(completionStageSource.toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) + .mapMaterializedValue(_.toJava) + .asJava + + /** + * Defers invoking the `create` function to create a single element until there is downstream demand. + * + * If the `create` function fails when invoked the stream is failed. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and will trigger the factory immediately. + * + * The materialized future `Done` value is completed when the `create` function has successfully been invoked, + * if the function throws the future materialized value is failed with that exception. + * If downstream cancels or fails before the function is invoked the materialized value + * is failed with a [[akka.stream.NeverMaterializedException]] + */ + def lazySingle[T](create: Creator[T]): Source[T, NotUsed] = + lazySource(() => single(create.create())).mapMaterializedValue(_ => NotUsed) + + /** + * Defers invoking the `create` function to create a future element until there is downstream demand. + * + * The returned future element will be emitted downstream when it completes, or fail the stream if the future + * is failed or the `create` function itself fails. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and will trigger the factory immediately. + * + * The materialized future `Done` value is completed when the `create` function has successfully been invoked and the future completes, + * if the function throws or the future fails the future materialized value is failed with that exception. + * If downstream cancels or fails before the function is invoked the materialized value + * is failed with a [[akka.stream.NeverMaterializedException]] + */ + def lazyCompletionStage[T](create: Creator[CompletionStage[T]]): Source[T, NotUsed] = + scaladsl.Source + .lazySource { () => + val f = create.create().toScala + scaladsl.Source.future(f) + } + .mapMaterializedValue(_ => NotUsed.notUsed()) + .asJava + + /** + * Defers invoking the `create` function to create a future source until there is downstream demand. + * + * The returned source will emit downstream and behave just like it was the outer source. Downstream completes + * when the created source completes and fails when the created source fails. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and will trigger the factory immediately. + * + * The materialized future value is completed with the materialized value of the created source when + * it has been materialized. If the function throws or the source materialization fails the future materialized value + * is failed with the thrown exception. + * + * If downstream cancels or fails before the function is invoked the materialized value + * is failed with a [[akka.stream.NeverMaterializedException]] + */ + def lazySource[T, M](create: Creator[Source[T, M]]): Source[T, CompletionStage[M]] = + scaladsl.Source.lazySource(() => create.create().asScala).mapMaterializedValue(_.toJava).asJava + + /** + * Defers invoking the `create` function to create a future source until there is downstream demand. + * + * The returned future source will emit downstream and behave just like it was the outer source when the `CompletionStage` completes + * successfully. Downstream completes when the created source completes and fails when the created source fails. + * If the `CompletionStage` or the `create` function fails the stream is failed. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and triggers the factory immediately. + * + * The materialized `CompletionStage` value is completed with the materialized value of the created source when + * it has been materialized. If the function throws or the source materialization fails the future materialized value + * is failed with the thrown exception. + * + * If downstream cancels or fails before the function is invoked the materialized value + * is failed with a [[akka.stream.NeverMaterializedException]] + */ + def lazyCompletionStageSource[T, M](create: Creator[CompletionStage[Source[T, M]]]): Source[T, CompletionStage[M]] = + lazySource[T, CompletionStage[M]](() => completionStageSource(create.create())) + .mapMaterializedValue(_.thenCompose(csm => csm)) + /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 3460625f82..2115ec95b5 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -581,10 +581,9 @@ object Flow { * * '''Cancels when''' downstream cancels */ - @Deprecated @deprecated( - "Use lazyInitAsync instead. (lazyInitAsync returns a flow with a more useful materialized value.)", - "2.5.12") + "Use 'Flow.futureFlow' in combination with prefixAndTail(1) instead, see `futureFlow` operator docs for details", + "2.6.0") def lazyInit[I, O, M](flowFactory: I => Future[Flow[I, O, M]], fallback: () => M): Flow[I, O, M] = Flow.fromGraph(new LazyFlow[I, O, M](flowFactory)).mapMaterializedValue(_ => fallback()) @@ -604,8 +603,71 @@ object Flow { * * '''Cancels when''' downstream cancels */ + @deprecated("Use 'Flow.lazyFutureFlow' instead", "2.6.0") def lazyInitAsync[I, O, M](flowFactory: () => Future[Flow[I, O, M]]): Flow[I, O, Future[Option[M]]] = - Flow.fromGraph(new LazyFlow[I, O, M](_ => flowFactory())) + Flow.fromGraph(new LazyFlow[I, O, M](_ => flowFactory())).mapMaterializedValue { v => + implicit val ec = akka.dispatch.ExecutionContexts.sameThreadExecutionContext + v.map[Option[M]](Some.apply _).recover { case _: NeverMaterializedException => None } + } + + /** + * Turn a `Future[Flow]` into a flow that will consume the values of the source when the future completes successfully. + * If the `Future` is completed with a failure the stream is failed. + * + * The materialized future value is completed with the materialized value of the future flow or failed with a + * [[NeverMaterializedException]] if upstream fails or downstream cancels before the future has completed. + */ + def futureFlow[I, O, M](flow: Future[Flow[I, O, M]]): Flow[I, O, Future[M]] = + lazyFutureFlow(() => flow) + + /** + * Defers invoking the `create` function to create a future flow until there is downstream demand and passing + * that downstream demand upstream triggers the first element. + * + * The materialized future value is completed with the materialized value of the created flow when that has successfully + * been materialized. + * + * If the `create` function throws or returns a future that fails the stream is failed, in this case the materialized + * future value is failed with a [[NeverMaterializedException]]. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and can trigger the factory earlier than expected. + * + * '''Emits when''' the internal flow is successfully created and it emits + * + * '''Backpressures when''' the internal flow is successfully created and it backpressures or downstream backpressures + * + * '''Completes when''' upstream completes and all elements have been emitted from the internal flow + * + * '''Cancels when''' downstream cancels + */ + def lazyFlow[I, O, M](create: () => Flow[I, O, M]): Flow[I, O, Future[M]] = + lazyFutureFlow(() => Future.successful(create())) + + /** + * Defers invoking the `create` function to create a future flow until there downstream demand has caused upstream + * to send a first element. + * + * The materialized future value is completed with the materialized value of the created flow when that has successfully + * been materialized. + * + * If the `create` function throws or returns a future that fails the stream is failed, in this case the materialized + * future value is failed with a [[NeverMaterializedException]]. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and can trigger the factory earlier than expected. + * + * '''Emits when''' the internal flow is successfully created and it emits + * + * '''Backpressures when''' the internal flow is successfully created and it backpressures or downstream backpressures + * + * '''Completes when''' upstream completes and all elements have been emitted from the internal flow + * + * '''Cancels when''' downstream cancels + */ + def lazyFutureFlow[I, O, M](create: () => Future[Flow[I, O, M]]): Flow[I, O, Future[M]] = + Flow.fromGraph(new LazyFlow(_ => create())) + } object RunnableGraph { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 4fc075265b..0b293264cd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -583,14 +583,12 @@ object Sink { * sink fails then the `Future` is completed with the exception. * Otherwise the `Future` is completed with the materialized value of the internal sink. */ - @Deprecated - @deprecated( - "Use lazyInitAsync instead. (lazyInitAsync no more needs a fallback function and the materialized value more clearly indicates if the internal sink was materialized or not.)", - "2.5.11") + @deprecated("Use 'Sink.lazyFutureSink' in combination with 'Flow.prefixAndTail(1)' instead", "2.6.0") def lazyInit[T, M](sinkFactory: T => Future[Sink[T, M]], fallback: () => M): Sink[T, Future[M]] = Sink .fromGraph(new LazySink[T, M](sinkFactory)) - .mapMaterializedValue(_.map(_.getOrElse(fallback()))(ExecutionContexts.sameThreadExecutionContext)) + .mapMaterializedValue( + _.recover { case _: NeverMaterializedException => fallback() }(ExecutionContexts.sameThreadExecutionContext)) /** * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, @@ -601,7 +599,45 @@ object Sink { * sink fails then the `Future` is completed with the exception. * Otherwise the `Future` is completed with the materialized value of the internal sink. */ + @deprecated("Use 'Sink.lazyFutureSink' instead", "2.6.0") def lazyInitAsync[T, M](sinkFactory: () => Future[Sink[T, M]]): Sink[T, Future[Option[M]]] = - Sink.fromGraph(new LazySink[T, M](_ => sinkFactory())) + Sink.fromGraph(new LazySink[T, M](_ => sinkFactory())).mapMaterializedValue { m => + implicit val ec = ExecutionContexts.sameThreadExecutionContext + m.map(Option.apply _).recover { case _: NeverMaterializedException => None } + } + + /** + * Turn a `Future[Sink]` into a Sink that will consume the values of the source when the future completes successfully. + * If the `Future` is completed with a failure the stream is failed. + * + * The materialized future value is completed with the materialized value of the future sink or failed with a + * [[NeverMaterializedException]] if upstream fails or downstream cancels before the future has completed. + */ + def futureSink[T, M](future: Future[Sink[T, M]]): Sink[T, Future[M]] = + lazyFutureSink[T, M](() => future) + + /** + * Defers invoking the `create` function to create a sink until there is a first element passed from upstream. + * + * The materialized future value is completed with the materialized value of the created sink when that has successfully + * been materialized. + * + * If the `create` function throws or returns or the stream fails to materialize, in this + * case the materialized future value is failed with a [[akka.stream.NeverMaterializedException]]. + */ + def lazySink[T, M](create: () => Sink[T, M]): Sink[T, Future[M]] = + lazyFutureSink(() => Future.successful(create())) + + /** + * Defers invoking the `create` function to create a future sink until there is a first element passed from upstream. + * + * The materialized future value is completed with the materialized value of the created sink when that has successfully + * been materialized. + * + * If the `create` function throws or returns a future that is failed, or the stream fails to materialize, in this + * case the materialized future value is failed with a [[akka.stream.NeverMaterializedException]]. + */ + def lazyFutureSink[T, M](create: () => Future[Sink[T, M]]): Sink[T, Future[M]] = + Sink.fromGraph(new LazySink(_ => create())) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index a721a046a0..1034f23220 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -16,13 +16,14 @@ import akka.stream.{ Outlet, SourceShape, _ } import akka.util.ConstantFun import akka.{ Done, NotUsed } import org.reactivestreams.{ Publisher, Subscriber } + import scala.annotation.tailrec import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } - import akka.stream.stage.GraphStageWithMaterializedValue + import scala.compat.java8.FutureConverters._ /** @@ -341,6 +342,7 @@ object Source { * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `Future` is completed with a failure. */ + @deprecated("Use 'Source.future' instead", "2.6.0") def fromFuture[T](future: Future[T]): Source[T, NotUsed] = fromGraph(new FutureSource(future)) @@ -350,6 +352,7 @@ object Source { * may happen before or after materializing the `Flow`. * The stream terminates with a failure if the `Future` is completed with a failure. */ + @deprecated("Use 'Source.completionStage' instead", "2.6.0") def fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed] = fromGraph(new FutureSource(future.toScala)) @@ -358,6 +361,7 @@ object Source { * If the [[Future]] fails the stream is failed with the exception from the future. If downstream cancels before the * stream completes the materialized `Future` will be failed with a [[StreamDetachedException]] */ + @deprecated("Use 'Source.futureSource' (potentially together with `Source.fromGraph`) instead", "2.6.0") def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]] = fromGraph(new FutureFlattenSource(future)) @@ -367,6 +371,7 @@ object Source { * If downstream cancels before the stream completes the materialized `Future` will be failed * with a [[StreamDetachedException]] */ + @deprecated("Use scala-compat CompletionStage to future converter and 'Source.futureSource' instead", "2.6.0") def fromSourceCompletionStage[T, M]( completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava) @@ -462,6 +467,7 @@ object Source { * the materialized future is completed with its value, if downstream cancels or fails without any demand the * create factory is never called and the materialized `Future` is failed. */ + @deprecated("Use 'Source.lazySource' instead", "2.6.0") def lazily[T, M](create: () => Source[T, M]): Source[T, Future[M]] = Source.fromGraph(new LazySource[T, M](create)) @@ -472,9 +478,98 @@ object Source { * * @see [[Source.lazily]] */ + @deprecated("Use 'Source.lazyFuture' instead", "2.6.0") def lazilyAsync[T](create: () => Future[T]): Source[T, Future[NotUsed]] = lazily(() => fromFuture(create())) + /** + * Emits a single value when the given `Future` is successfully completed and then completes the stream. + * The stream fails if the `Future` is completed with a failure. + */ + def future[T](futureElement: Future[T]): Source[T, NotUsed] = + fromGraph(new FutureSource[T](futureElement)) + + /** + * Emits a single value when the given `CompletionStage` is successfully completed and then completes the stream. + * If the `CompletionStage` is completed with a failure the stream is failed. + * + * Here for Java interoperability, the normal use from Scala should be [[Source.future]] + */ + def completionStage[T](completionStage: CompletionStage[T]): Source[T, NotUsed] = + future(completionStage.toScala) + + /** + * Turn a `Future[Source]` into a source that will emit the values of the source when the future completes successfully. + * If the `Future` is completed with a failure the stream is failed. + */ + def futureSource[T, M](futureSource: Future[Source[T, M]]): Source[T, Future[M]] = + fromGraph(new FutureFlattenSource(futureSource)) + + /** + * Defers invoking the `create` function to create a single element until there is downstream demand. + * + * If the `create` function fails when invoked the stream is failed. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and will trigger the factory immediately. + */ + def lazySingle[T](create: () => T): Source[T, NotUsed] = + lazySource(() => single(create())).mapMaterializedValue(_ => NotUsed) + + /** + * Defers invoking the `create` function to create a future element until there is downstream demand. + * + * The returned future element will be emitted downstream when it completes, or fail the stream if the future + * is failed or the `create` function itself fails. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and will trigger the factory immediately. + */ + def lazyFuture[T](create: () => Future[T]): Source[T, NotUsed] = + lazySource { () => + val f = create() + future(f) + }.mapMaterializedValue(_ => NotUsed) + + /** + * Defers invoking the `create` function to create a future source until there is downstream demand. + * + * The returned source will emit downstream and behave just like it was the outer source. Downstream completes + * when the created source completes and fails when the created source fails. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and will trigger the factory immediately. + * + * The materialized future value is completed with the materialized value of the created source when + * it has been materialized. If the function throws or the source materialization fails the future materialized value + * is failed with the thrown exception. + * + * If downstream cancels or fails before the function is invoked the materialized value + * is failed with a [[akka.stream.NeverMaterializedException]] + */ + def lazySource[T, M](create: () => Source[T, M]): Source[T, Future[M]] = + fromGraph(new LazySource(create)) + + /** + * Defers invoking the `create` function to create a future source until there is downstream demand. + * + * The returned future source will emit downstream and behave just like it was the outer source when the future completes + * successfully. Downstream completes when the created source completes and fails when the created source fails. + * If the future or the `create` function fails the stream is failed. + * + * Note that asynchronous boundaries (and other operators) in the stream may do pre-fetching which counter acts + * the laziness and triggers the factory immediately. + * + * The materialized future value is completed with the materialized value of the created source when + * it has been materialized. If the function throws or the source materialization fails the future materialized value + * is failed with the thrown exception. + * + * If downstream cancels or fails before the function is invoked the materialized value + * is failed with a [[akka.stream.NeverMaterializedException]] + */ + def lazyFutureSource[T, M](create: () => Future[Source[T, M]]): Source[T, Future[M]] = + lazySource(() => futureSource(create())).mapMaterializedValue(_.flatten) + /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */