diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromJavaStream.md b/akka-docs/src/main/paradox/stream/operators/Source/fromJavaStream.md
new file mode 100644
index 0000000000..5b7fae6d8d
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source/fromJavaStream.md
@@ -0,0 +1,39 @@
+# fromJavaStream
+
+Stream the values from a Java 8 `Stream`, requesting the next value when there is demand.
+
+@ref[Source operators](../index.md#source-operators)
+
+@@@div { .group-scala }
+
+## Signature
+
+@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #fromJavaStream }
+
+@@@
+
+## Description
+
+Stream the values from a Java 8 `Stream`, requesting the next value when there is demand. The iterator will be created anew
+for each materialization, which is the reason the @scala[`method`] @java[`factory`] takes a @scala[`function`] @java[`Creator`] rather than an `Stream` directly.
+
+ You can use [[Source.async]] to create asynchronous boundaries between synchronous java stream and the rest of flow.
+## Example
+
+Scala
+: @@snip [From.scala](/akka-docs/src/test/scala/docs/stream/operators/source/From.scala) { #from-javaStream }
+
+Java
+: @@snip [From.java](/akka-docs/src/test/java/jdocs/stream/operators/source/From.java) { #from-javaStream }
+
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** the next value returned from the iterator
+
+**completes** when the iterator reaches its end
+
+@@@
+
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index e96095d028..bf3191c578 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -22,6 +22,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|Source|@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).|
|Source|@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref[`Source.futureSource`](Source/futureSource.md).|
|Source|@ref[fromIterator](Source/fromIterator.md)|Stream the values from an `Iterator`, requesting the next value when there is demand.|
+|Source|@ref[fromJavaStream](Source/fromJavaStream.md)|Stream the values from a Java 8 `Stream`, requesting the next value when there is demand.|
|Source|@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).|
|Source|@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
|Source|@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.|
@@ -353,6 +354,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [asSourceWithContext](Source/asSourceWithContext.md)
* [fromPublisher](Source/fromPublisher.md)
* [fromIterator](Source/fromIterator.md)
+* [fromJavaStream](Source/fromJavaStream.md)
* [cycle](Source/cycle.md)
* [fromMaterializer](Source-or-Flow/fromMaterializer.md)
* [setup](Source-or-Flow/setup.md)
diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/From.java b/akka-docs/src/test/java/jdocs/stream/operators/source/From.java
index b1dbef700f..ee917d5dcb 100644
--- a/akka-docs/src/test/java/jdocs/stream/operators/source/From.java
+++ b/akka-docs/src/test/java/jdocs/stream/operators/source/From.java
@@ -8,6 +8,7 @@ import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import java.util.Arrays;
+import java.util.stream.IntStream;
public class From {
@@ -23,4 +24,15 @@ public class From {
// 3
// #from-iterator
}
+
+ void fromJavaStreamSample() {
+ // #from-javaStream
+ Source.fromJavaStream(() -> IntStream.rangeClosed(1, 3))
+ .runForeach(System.out::println, system);
+ // could print
+ // 1
+ // 2
+ // 3
+ // #from-javaStream
+ }
}
diff --git a/akka-docs/src/test/scala/docs/stream/operators/source/From.scala b/akka-docs/src/test/scala/docs/stream/operators/source/From.scala
index 7d034a75d6..be29fe5573 100644
--- a/akka-docs/src/test/scala/docs/stream/operators/source/From.scala
+++ b/akka-docs/src/test/scala/docs/stream/operators/source/From.scala
@@ -4,6 +4,8 @@
package docs.stream.operators.source
+import java.util.stream.IntStream
+
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
@@ -21,4 +23,14 @@ object From {
//#from-iterator
}
+ def fromJavaStreamSample(): Unit = {
+ //#from-javaStream
+ Source.fromJavaStream(() => IntStream.rangeClosed(1, 3)).runForeach(println)
+ // could print
+ // 1
+ // 2
+ // 3
+ //#from-javaStream
+ }
+
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index e165ca33c7..7356dd0acc 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -100,6 +100,17 @@ object Source {
def fromIterator[O](f: function.Creator[java.util.Iterator[O]]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.fromIterator(() => f.create().asScala))
+ /**
+ * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its
+ * elements and send them downstream on demand.
+ *
+ * You can use [[Source.async]] to create asynchronous boundaries between synchronous java stream
+ * and the rest of flow.
+ */
+ def fromJavaStream[O, S <: java.util.stream.BaseStream[O, S]](
+ stream: function.Creator[java.util.stream.BaseStream[O, S]]): javadsl.Source[O, NotUsed] =
+ StreamConverters.fromJavaStream(stream)
+
/**
* Helper to create 'cycled' [[Source]] from iterator provider.
* Example usage:
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
index d399ef291e..18bb3f3cd3 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
@@ -285,6 +285,17 @@ object Source {
override def toString: String = "() => Iterator"
})
+ /**
+ * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its
+ * elements and send them downstream on demand.
+ *
+ * You can use [[Source.async]] to create asynchronous boundaries between synchronous Java ``Stream``
+ * and the rest of flow.
+ */
+ def fromJavaStream[T, S <: java.util.stream.BaseStream[T, S]](
+ stream: () => java.util.stream.BaseStream[T, S]): Source[T, NotUsed] =
+ StreamConverters.fromJavaStream(stream);
+
/**
* Creates [[Source]] that will continually produce given elements in specified order.
*