Docs: Example on the as/fromJavaStream operators (#28745)

This commit is contained in:
Ignasi Marimon-Clos 2020-03-18 15:54:21 +01:00 committed by GitHub
parent 94d5b04d93
commit 3d8b4360f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 154 additions and 15 deletions

View file

@ -4,11 +4,9 @@ Create a sink which materializes into Java 8 `Stream` that can be run to trigger
@ref[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters)
@@@ div { .group-scala }
## Signature
@@signature [StreamConverters.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala) { #asJavaStream }
@@@
@apidoc[StreamConverters](StreamConverters$) { scala="#asJavaStream[T]():akka.stream.scaladsl.Sink[T,java.util.stream.Stream[T]]" java="#asJavaStream()" }
## Description
@ -16,10 +14,21 @@ Create a sink which materializes into Java 8 `Stream` that can be run to trigger
Elements emitted through the stream will be available for reading through the Java 8 `Stream`.
The Java 8 `Stream` will be ended when the stream flowing into this `Sink` completes, and closing the Java
`Stream` will cancel the inflow of this `Sink`. Java `Stream` throws exception in case reactive stream failed.
`Stream` will cancel the inflow of this `Sink`. If the Java `Stream` throws an exception, the Akka stream is cancelled.
Be aware that Java `Stream` blocks current thread while waiting on next element from downstream.
## Example
Here is an example of a @apidoc[Sink] that materializes into a @javadoc[java.util.stream.Stream](java.util.stream.Stream).
Scala
: @@snip [StreamConvertersToJava.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala) { #import #asJavaStream }
Java
: @@snip [StreamConvertersToJava.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #asJavaStream }
## Reactive Streams semantics
@@@div { .callout }
@ -27,4 +36,3 @@ Be aware that Java `Stream` blocks current thread while waiting on next element
**backpressures** when no read is pending on the Java Stream
@@@

View file

@ -1,17 +1,20 @@
# StreamConverters.fromJavaStream
Create a source that wraps a Java 8 `Stream`.
Create a source that wraps a Java 8 `java.util.stream.Stream`.
@ref[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters)
@@@div { .group-scala }
## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala) { #fromJavaStream }
@apidoc[StreamConverters](StreamConverters$) { scala="#fromJavaStream[T,S%3C:java.util.stream.BaseStream[T,S]](stream:()=%3Ejava.util.stream.BaseStream[T,S]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#fromJavaStream(akka.japi.function.Creator)" }
@@@
## Example
## Description
Here is an example of a @apidoc[Source] created from a @javadoc[java.util.stream.Stream](java.util.stream.Stream).
Scala
: @@snip [StreamConvertersToJava.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/StreamConvertersToJava.scala) { #import #fromJavaStream }
Java
: @@snip [StreamConvertersToJava.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #fromJavaStream }
TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646

View file

@ -116,7 +116,7 @@ For example, following snippet will fall with timeout exception:
|StreamConverters|<a name="asjavastream"></a>@ref[asJavaStream](StreamConverters/asJavaStream.md)|Create a sink which materializes into Java 8 `Stream` that can be run to trigger demand through the sink.|
|StreamConverters|<a name="asoutputstream"></a>@ref[asOutputStream](StreamConverters/asOutputStream.md)|Create a source that materializes into an `OutputStream`.|
|StreamConverters|<a name="frominputstream"></a>@ref[fromInputStream](StreamConverters/fromInputStream.md)|Create a source that wraps an `InputStream`.|
|StreamConverters|<a name="fromjavastream"></a>@ref[fromJavaStream](StreamConverters/fromJavaStream.md)|Create a source that wraps a Java 8 `Stream`.|
|StreamConverters|<a name="fromjavastream"></a>@ref[fromJavaStream](StreamConverters/fromJavaStream.md)|Create a source that wraps a Java 8 `java.util.stream.Stream`.|
|StreamConverters|<a name="fromoutputstream"></a>@ref[fromOutputStream](StreamConverters/fromOutputStream.md)|Create a sink that wraps an `OutputStream`.|
|StreamConverters|<a name="javacollector"></a>@ref[javaCollector](StreamConverters/javaCollector.md)|Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector` transformation and reduction operations.|
|StreamConverters|<a name="javacollectorparallelunordered"></a>@ref[javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md)|Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector` transformation and reduction operations.|

View file

@ -0,0 +1,77 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.converters;
import akka.NotUsed;
import akka.actor.ActorSystem;
// #import
import akka.japi.function.Creator;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.StreamConverters;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.BaseStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;
// #import
import akka.testkit.javadsl.TestKit;
import jdocs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.stream.javadsl.Source;
import static org.junit.Assert.assertEquals;
/** */
public class StreamConvertersToJava extends AbstractJavaTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("StreamConvertersToJava");
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void demonstrateConverterToJava8Stream() {
// #asJavaStream
Source<Integer, NotUsed> source = Source.range(0, 9).filter(i -> i % 2 == 0);
Sink<Integer, java.util.stream.Stream<Integer>> sink = StreamConverters.<Integer>asJavaStream();
Stream<Integer> jStream = source.runWith(sink, system);
// #asJavaStream
assertEquals(5, jStream.count());
}
@Test
public void demonstrateCreatingASourceFromJava8Stream()
throws InterruptedException, ExecutionException, TimeoutException {
// #fromJavaStream
Creator<BaseStream<Integer, IntStream>> creator = () -> IntStream.rangeClosed(0, 9);
Source<Integer, NotUsed> source = StreamConverters.fromJavaStream(creator);
Sink<Integer, CompletionStage<Integer>> sink = Sink.last();
CompletionStage<Integer> integerCompletionStage = source.runWith(sink, system);
// #fromJavaStream
assertEquals(
9, integerCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS).intValue());
}
}

View file

@ -0,0 +1,51 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.converters
// #import
import java.util.stream
import java.util.stream.IntStream
import akka.NotUsed
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.StreamConverters
// #import
import akka.testkit.AkkaSpec
import org.scalatest.concurrent.Futures
import scala.collection.immutable
import scala.concurrent.Future
class StreamConvertersToJava extends AkkaSpec with Futures {
"demonstrate materialization to Java8 streams" in {
//#asJavaStream
val source: Source[Int, NotUsed] = Source(0 to 9).filter(_ % 2 == 0)
val sink: Sink[Int, stream.Stream[Int]] = StreamConverters.asJavaStream[Int]()
val jStream: java.util.stream.Stream[Int] = source.runWith(sink)
//#asJavaStream
jStream.count should be(5)
}
"demonstrate conversion from Java8 streams" in {
//#fromJavaStream
def factory(): IntStream = IntStream.rangeClosed(0, 9)
val source: Source[Int, NotUsed] = StreamConverters.fromJavaStream(factory).map(_.intValue())
val sink: Sink[Int, Future[immutable.Seq[Int]]] = Sink.seq[Int]
val futureInts: Future[immutable.Seq[Int]] = source.toMat(sink)(Keep.right).run
//#fromJavaStream
whenReady(futureInts) { ints =>
ints should be((0 to 9).toSeq)
}
}
}

View file

@ -170,7 +170,7 @@ object StreamConverters {
* The Java 8 ``Stream`` will be ended when the stream flowing into this ``Sink`` completes, and closing the Java
* ``Stream`` will cancel the inflow of this ``Sink``.
*
* Java 8 ``Stream`` throws exception in case reactive stream failed.
* If the Java 8 ``Stream`` throws exception the Akka stream is cancelled.
*
* Be aware that Java ``Stream`` blocks current thread while waiting on next element from downstream.
* As it is interacting wit blocking API the implementation runs on a separate dispatcher
@ -210,7 +210,7 @@ object StreamConverters {
* Creates a source that wraps a Java 8 ``Stream``. ``Source`` uses a stream iterator to get all its
* elements and send them downstream on demand.
*
* Example usage: `Source.fromJavaStream(() => IntStream.rangeClosed(1, 10))`
* Example usage: `StreamConverters.fromJavaStream(() => IntStream.rangeClosed(1, 10))`
*
* You can use [[Source.async]] to create asynchronous boundaries between synchronous Java ``Stream``
* and the rest of flow.