Add Source.fromJavaStream as convenient method alias (#28881)

This commit is contained in:
kerr 2020-04-28 17:15:11 +08:00 committed by GitHub
parent 404d45ea82
commit 6b5d544deb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 0 deletions

View file

@ -0,0 +1,39 @@
# fromJavaStream
Stream the values from a Java 8 `Stream`, requesting the next value when there is demand.
@ref[Source operators](../index.md#source-operators)
@@@div { .group-scala }
## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #fromJavaStream }
@@@
## Description
Stream the values from a Java 8 `Stream`, requesting the next value when there is demand. The iterator will be created anew
for each materialization, which is the reason the @scala[`method`] @java[`factory`] takes a @scala[`function`] @java[`Creator`] rather than an `Stream` directly.
You can use [[Source.async]] to create asynchronous boundaries between synchronous java stream and the rest of flow.
## Example
Scala
: @@snip [From.scala](/akka-docs/src/test/scala/docs/stream/operators/source/From.scala) { #from-javaStream }
Java
: @@snip [From.java](/akka-docs/src/test/java/jdocs/stream/operators/source/From.java) { #from-javaStream }
## Reactive Streams semantics
@@@div { .callout }
**emits** the next value returned from the iterator
**completes** when the iterator reaches its end
@@@

View file

@ -22,6 +22,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|<a name="fromfuture"></a>@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).| |Source|<a name="fromfuture"></a>@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).|
|Source|<a name="fromfuturesource"></a>@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref[`Source.futureSource`](Source/futureSource.md).| |Source|<a name="fromfuturesource"></a>@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref[`Source.futureSource`](Source/futureSource.md).|
|Source|<a name="fromiterator"></a>@ref[fromIterator](Source/fromIterator.md)|Stream the values from an `Iterator`, requesting the next value when there is demand.| |Source|<a name="fromiterator"></a>@ref[fromIterator](Source/fromIterator.md)|Stream the values from an `Iterator`, requesting the next value when there is demand.|
|Source|<a name="fromjavastream"></a>@ref[fromJavaStream](Source/fromJavaStream.md)|Stream the values from a Java 8 `Stream`, requesting the next value when there is demand.|
|Source|<a name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).| |Source|<a name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).|
|Source|<a name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).| |Source|<a name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.| |Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.|
@ -353,6 +354,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [asSourceWithContext](Source/asSourceWithContext.md) * [asSourceWithContext](Source/asSourceWithContext.md)
* [fromPublisher](Source/fromPublisher.md) * [fromPublisher](Source/fromPublisher.md)
* [fromIterator](Source/fromIterator.md) * [fromIterator](Source/fromIterator.md)
* [fromJavaStream](Source/fromJavaStream.md)
* [cycle](Source/cycle.md) * [cycle](Source/cycle.md)
* [fromMaterializer](Source-or-Flow/fromMaterializer.md) * [fromMaterializer](Source-or-Flow/fromMaterializer.md)
* [setup](Source-or-Flow/setup.md) * [setup](Source-or-Flow/setup.md)

View file

@ -8,6 +8,7 @@ import akka.actor.ActorSystem;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import java.util.Arrays; import java.util.Arrays;
import java.util.stream.IntStream;
public class From { public class From {
@ -23,4 +24,15 @@ public class From {
// 3 // 3
// #from-iterator // #from-iterator
} }
void fromJavaStreamSample() {
// #from-javaStream
Source.fromJavaStream(() -> IntStream.rangeClosed(1, 3))
.runForeach(System.out::println, system);
// could print
// 1
// 2
// 3
// #from-javaStream
}
} }

View file

@ -4,6 +4,8 @@
package docs.stream.operators.source package docs.stream.operators.source
import java.util.stream.IntStream
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
@ -21,4 +23,14 @@ object From {
//#from-iterator //#from-iterator
} }
def fromJavaStreamSample(): Unit = {
//#from-javaStream
Source.fromJavaStream(() => IntStream.rangeClosed(1, 3)).runForeach(println)
// could print
// 1
// 2
// 3
//#from-javaStream
}
} }

View file

@ -100,6 +100,17 @@ object Source {
def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] = def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.fromIterator(() => f.create().asScala)) new Source(scaladsl.Source.fromIterator(() => f.create().asScala))
/**
* Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its
* elements and send them downstream on demand.
*
* You can use [[Source.async]] to create asynchronous boundaries between synchronous java stream
* and the rest of flow.
*/
def fromJavaStream[O, S <: java.util.stream.BaseStream[O, S]](
stream: function.Creator[java.util.stream.BaseStream[O, S]]): javadsl.Source[O, NotUsed] =
StreamConverters.fromJavaStream(stream)
/** /**
* Helper to create 'cycled' [[Source]] from iterator provider. * Helper to create 'cycled' [[Source]] from iterator provider.
* Example usage: * Example usage:

View file

@ -285,6 +285,17 @@ object Source {
override def toString: String = "() => Iterator" override def toString: String = "() => Iterator"
}) })
/**
* Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its
* elements and send them downstream on demand.
*
* You can use [[Source.async]] to create asynchronous boundaries between synchronous Java ``Stream``
* and the rest of flow.
*/
def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]](
stream: () => java.util.stream.BaseStream[T, S]): Source[T, NotUsed] =
StreamConverters.fromJavaStream(stream);
/** /**
* Creates [[Source]] that will continually produce given elements in specified order. * Creates [[Source]] that will continually produce given elements in specified order.
* *