From 6b5d544debe6bb5c5130097db87ef6ac3d61c1a8 Mon Sep 17 00:00:00 2001 From: kerr Date: Tue, 28 Apr 2020 17:15:11 +0800 Subject: [PATCH] Add Source.fromJavaStream as convenient method alias (#28881) --- .../stream/operators/Source/fromJavaStream.md | 39 +++++++++++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../jdocs/stream/operators/source/From.java | 12 ++++++ .../docs/stream/operators/source/From.scala | 12 ++++++ .../scala/akka/stream/javadsl/Source.scala | 11 ++++++ .../scala/akka/stream/scaladsl/Source.scala | 11 ++++++ 6 files changed, 87 insertions(+) create mode 100644 akka-docs/src/main/paradox/stream/operators/Source/fromJavaStream.md diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromJavaStream.md b/akka-docs/src/main/paradox/stream/operators/Source/fromJavaStream.md new file mode 100644 index 0000000000..5b7fae6d8d --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source/fromJavaStream.md @@ -0,0 +1,39 @@ +# fromJavaStream + +Stream the values from a Java 8 `Stream`, requesting the next value when there is demand. + +@ref[Source operators](../index.md#source-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #fromJavaStream } + +@@@ + +## Description + +Stream the values from a Java 8 `Stream`, requesting the next value when there is demand. The iterator will be created anew +for each materialization, which is the reason the @scala[`method`] @java[`factory`] takes a @scala[`function`] @java[`Creator`] rather than an `Stream` directly. + + You can use [[Source.async]] to create asynchronous boundaries between synchronous java stream and the rest of flow. +## Example + +Scala +: @@snip [From.scala](/akka-docs/src/test/scala/docs/stream/operators/source/From.scala) { #from-javaStream } + +Java +: @@snip [From.java](/akka-docs/src/test/java/jdocs/stream/operators/source/From.java) { #from-javaStream } + + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the next value returned from the iterator + +**completes** when the iterator reaches its end + +@@@ + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index e96095d028..bf3191c578 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -22,6 +22,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).| |Source|@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref[`Source.futureSource`](Source/futureSource.md).| |Source|@ref[fromIterator](Source/fromIterator.md)|Stream the values from an `Iterator`, requesting the next value when there is demand.| +|Source|@ref[fromJavaStream](Source/fromJavaStream.md)|Stream the values from a Java 8 `Stream`, requesting the next value when there is demand.| |Source|@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).| |Source|@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).| |Source|@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.| @@ -353,6 +354,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [asSourceWithContext](Source/asSourceWithContext.md) * [fromPublisher](Source/fromPublisher.md) * [fromIterator](Source/fromIterator.md) +* [fromJavaStream](Source/fromJavaStream.md) * [cycle](Source/cycle.md) * [fromMaterializer](Source-or-Flow/fromMaterializer.md) * [setup](Source-or-Flow/setup.md) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/From.java b/akka-docs/src/test/java/jdocs/stream/operators/source/From.java index b1dbef700f..ee917d5dcb 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/source/From.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/From.java @@ -8,6 +8,7 @@ import akka.actor.ActorSystem; import akka.stream.javadsl.Source; import java.util.Arrays; +import java.util.stream.IntStream; public class From { @@ -23,4 +24,15 @@ public class From { // 3 // #from-iterator } + + void fromJavaStreamSample() { + // #from-javaStream + Source.fromJavaStream(() -> IntStream.rangeClosed(1, 3)) + .runForeach(System.out::println, system); + // could print + // 1 + // 2 + // 3 + // #from-javaStream + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/From.scala b/akka-docs/src/test/scala/docs/stream/operators/source/From.scala index 7d034a75d6..be29fe5573 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/source/From.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/source/From.scala @@ -4,6 +4,8 @@ package docs.stream.operators.source +import java.util.stream.IntStream + import akka.actor.ActorSystem import akka.stream.scaladsl.Source @@ -21,4 +23,14 @@ object From { //#from-iterator } + def fromJavaStreamSample(): Unit = { + //#from-javaStream + Source.fromJavaStream(() => IntStream.rangeClosed(1, 3)).runForeach(println) + // could print + // 1 + // 2 + // 3 + //#from-javaStream + } + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index e165ca33c7..7356dd0acc 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -100,6 +100,17 @@ object Source { def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = new Source(scaladsl.Source.fromIterator(() => f.create().asScala)) + /** + * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its + * elements and send them downstream on demand. + * + * You can use [[Source.async]] to create asynchronous boundaries between synchronous java stream + * and the rest of flow. + */ + def fromJavaStream[O, S <: java.util.stream.BaseStream[O, S]]( + stream: function.Creator[java.util.stream.BaseStream[O, S]]): javadsl.Source[O, NotUsed] = + StreamConverters.fromJavaStream(stream) + /** * Helper to create 'cycled' [[Source]] from iterator provider. * Example usage: diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index d399ef291e..18bb3f3cd3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -285,6 +285,17 @@ object Source { override def toString: String = "() => Iterator" }) + /** + * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its + * elements and send them downstream on demand. + * + * You can use [[Source.async]] to create asynchronous boundaries between synchronous Java ``Stream`` + * and the rest of flow. + */ + def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]]( + stream: () => java.util.stream.BaseStream[T, S]): Source[T, NotUsed] = + StreamConverters.fromJavaStream(stream); + /** * Creates [[Source]] that will continually produce given elements in specified order. *