diff --git a/akka-docs/src/main/paradox/stream/operators/StreamConverters/asJavaStream.md b/akka-docs/src/main/paradox/stream/operators/StreamConverters/asJavaStream.md index 8d960f8fe1..29612a594a 100644 --- a/akka-docs/src/main/paradox/stream/operators/StreamConverters/asJavaStream.md +++ b/akka-docs/src/main/paradox/stream/operators/StreamConverters/asJavaStream.md @@ -4,11 +4,9 @@ Create a sink which materializes into Java 8 `Stream` that can be run to trigger @ref[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters) -@@@ div { .group-scala } ## Signature -@@signature [StreamConverters.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala) { #asJavaStream } -@@@ +@apidoc[StreamConverters](StreamConverters$) { scala="#asJavaStream[T]():akka.stream.scaladsl.Sink[T,java.util.stream.Stream[T]]" java="#asJavaStream()" } ## Description @@ -16,10 +14,21 @@ Create a sink which materializes into Java 8 `Stream` that can be run to trigger Elements emitted through the stream will be available for reading through the Java 8 `Stream`. The Java 8 `Stream` will be ended when the stream flowing into this `Sink` completes, and closing the Java -`Stream` will cancel the inflow of this `Sink`. Java `Stream` throws exception in case reactive stream failed. +`Stream` will cancel the inflow of this `Sink`. If the Java `Stream` throws an exception, the Akka stream is cancelled. Be aware that Java `Stream` blocks current thread while waiting on next element from downstream. +## Example + +Here is an example of a @apidoc[Sink] that materializes into a @javadoc[java.util.stream.Stream](java.util.stream.Stream). + +Scala +: @@snip [StreamConvertersToJava.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala) { #import #asJavaStream } + +Java +: @@snip [StreamConvertersToJava.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #asJavaStream } + + ## Reactive Streams semantics @@@div { .callout } @@ -27,4 +36,3 @@ Be aware that Java `Stream` blocks current thread while waiting on next element **backpressures** when no read is pending on the Java Stream @@@ - diff --git a/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromJavaStream.md b/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromJavaStream.md index 6f7135c7f3..e76494a0ab 100644 --- a/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromJavaStream.md +++ b/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromJavaStream.md @@ -1,17 +1,20 @@ # StreamConverters.fromJavaStream -Create a source that wraps a Java 8 `Stream`. +Create a source that wraps a Java 8 `java.util.stream.Stream`. @ref[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters) -@@@div { .group-scala } - ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala) { #fromJavaStream } +@apidoc[StreamConverters](StreamConverters$) { scala="#fromJavaStream[T,S%3C:java.util.stream.BaseStream[T,S]](stream:()=%3Ejava.util.stream.BaseStream[T,S]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#fromJavaStream(akka.japi.function.Creator)" } -@@@ +## Example -## Description +Here is an example of a @apidoc[Source] created from a @javadoc[java.util.stream.Stream](java.util.stream.Stream). + +Scala +: @@snip [StreamConvertersToJava.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala) { #import #fromJavaStream } + +Java +: @@snip [StreamConvertersToJava.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #fromJavaStream } -TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646 diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index c1dacfa03c..8fcd2c18e2 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -116,7 +116,7 @@ For example, following snippet will fall with timeout exception: |StreamConverters|@ref[asJavaStream](StreamConverters/asJavaStream.md)|Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.| |StreamConverters|@ref[asOutputStream](StreamConverters/asOutputStream.md)|Create a source that materializes into an `OutputStream`.| |StreamConverters|@ref[fromInputStream](StreamConverters/fromInputStream.md)|Create a source that wraps an `InputStream`.| -|StreamConverters|@ref[fromJavaStream](StreamConverters/fromJavaStream.md)|Create a source that wraps a Java 8 `Stream`.| +|StreamConverters|@ref[fromJavaStream](StreamConverters/fromJavaStream.md)|Create a source that wraps a Java 8 `java.util.stream.Stream`.| |StreamConverters|@ref[fromOutputStream](StreamConverters/fromOutputStream.md)|Create a sink that wraps an `OutputStream`.| |StreamConverters|@ref[javaCollector](StreamConverters/javaCollector.md)|Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector` transformation and reduction operations.| |StreamConverters|@ref[javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md)|Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector` transformation and reduction operations.| diff --git a/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java b/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java new file mode 100644 index 0000000000..e47bd6f7f6 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.converters; + +import akka.NotUsed; +import akka.actor.ActorSystem; +// #import +import akka.japi.function.Creator; +import akka.stream.Materializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.StreamConverters; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.BaseStream; +import java.util.stream.IntStream; +import java.util.stream.Stream; +// #import +import akka.testkit.javadsl.TestKit; +import jdocs.AbstractJavaTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import akka.stream.javadsl.Source; + +import static org.junit.Assert.assertEquals; + +/** */ +public class StreamConvertersToJava extends AbstractJavaTest { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("StreamConvertersToJava"); + } + + @AfterClass + public static void tearDown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void demonstrateConverterToJava8Stream() { + // #asJavaStream + + Source source = Source.range(0, 9).filter(i -> i % 2 == 0); + + Sink> sink = StreamConverters.asJavaStream(); + + Stream jStream = source.runWith(sink, system); + + // #asJavaStream + assertEquals(5, jStream.count()); + } + + @Test + public void demonstrateCreatingASourceFromJava8Stream() + throws InterruptedException, ExecutionException, TimeoutException { + // #fromJavaStream + + Creator> creator = () -> IntStream.rangeClosed(0, 9); + Source source = StreamConverters.fromJavaStream(creator); + + Sink> sink = Sink.last(); + + CompletionStage integerCompletionStage = source.runWith(sink, system); + // #fromJavaStream + assertEquals( + 9, integerCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS).intValue()); + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala b/akka-docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala new file mode 100644 index 0000000000..958088d96d --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.converters + +// #import +import java.util.stream +import java.util.stream.IntStream + +import akka.NotUsed +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.StreamConverters +// #import +import akka.testkit.AkkaSpec +import org.scalatest.concurrent.Futures + +import scala.collection.immutable +import scala.concurrent.Future + +class StreamConvertersToJava extends AkkaSpec with Futures { + + "demonstrate materialization to Java8 streams" in { + //#asJavaStream + val source: Source[Int, NotUsed] = Source(0 to 9).filter(_ % 2 == 0) + + val sink: Sink[Int, stream.Stream[Int]] = StreamConverters.asJavaStream[Int]() + + val jStream: java.util.stream.Stream[Int] = source.runWith(sink) + //#asJavaStream + jStream.count should be(5) + } + + "demonstrate conversion from Java8 streams" in { + //#fromJavaStream + def factory(): IntStream = IntStream.rangeClosed(0, 9) + val source: Source[Int, NotUsed] = StreamConverters.fromJavaStream(factory).map(_.intValue()) + val sink: Sink[Int, Future[immutable.Seq[Int]]] = Sink.seq[Int] + + val futureInts: Future[immutable.Seq[Int]] = source.toMat(sink)(Keep.right).run + + //#fromJavaStream + whenReady(futureInts) { ints => + ints should be((0 to 9).toSeq) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 00fcb48f2d..560a085a21 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -170,7 +170,7 @@ object StreamConverters { * The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java * ``Stream`` will cancel the inflow of this ``Sink``. * - * Java 8 ``Stream`` throws exception in case reactive stream failed. + * If the Java 8 ``Stream`` throws exception the Akka stream is cancelled. * * Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream. * As it is interacting wit blocking API the implementation runs on a separate dispatcher @@ -210,7 +210,7 @@ object StreamConverters { * Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its * elements and send them downstream on demand. * - * Example usage: `Source.fromJavaStream(() => IntStream.rangeClosed(1, 10))` + * Example usage: `StreamConverters.fromJavaStream(() => IntStream.rangeClosed(1, 10))` * * You can use [[Source.async]] to create asynchronous boundaries between synchronous Java ``Stream`` * and the rest of flow.