Docs: examples for io streams with StreamConverters (#28746)

This commit is contained in:
Ignasi Marimon-Clos 2020-03-27 09:56:59 +01:00 committed by GitHub
parent b40873dc10
commit f683241c40
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 191 additions and 17 deletions

View file

@ -4,14 +4,38 @@ Create a source that wraps an `InputStream`.
@ref[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters)
@@@div { .group-scala }
## Signature
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala) { #fromInputStream }
@@@
@apidoc[StreamConverters.fromInputStream](StreamConverters$) { scala="#fromInputStream(in:()=%3Ejava.io.InputStream,chunkSize:Int):akka.stream.scaladsl.Source[akka.util.ByteString,scala.concurrent.Future[akka.stream.IOResult]]" java="#fromInputStream(akka.japi.function.Creator)" }
## Description
TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646
Creates a Source from a `java.io.InputStream` created by the given function. Emitted elements are up to `chunkSize`
sized @apidoc[akka.util.ByteString]s elements. The actual size of the emitted elements depends on how much data the
underlying `java.io.InputStream` returns on each read invocation. Such chunks will never be larger
than `chunkSize` though.
You can configure the default dispatcher for this Source by changing
the `akka.stream.materializer.blocking-io-dispatcher` or set it for a given Source by
using `akka.stream.ActorAttributes`.
It materializes a @java[`CompletionStage`]@scala[`Future`] of `IOResult` containing the number of bytes read from the source file
upon completion, and a possible exception if IO operation was not completed successfully. Note that bytes having
been read by the source does not give any guarantee that the bytes were seen by downstream stages.
The created `InputStream` will be closed when the `Source` is cancelled.
See also @ref:[fromOutputStream](fromOutputStream.md)
## Example
Here is an example using both `fromInputStream` and `fromOutputStream` to read from a `java.io.InputStream`,
uppercase the read content and write back out into a `java.io.OutputStream`.
Scala
: @@snip [ToFromJavaIOStreams.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala) { #tofromJavaIOStream }
Java
: @@snip [ToFromJavaIOStreams.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java) { #tofromJavaIOStream }

View file

@ -6,7 +6,7 @@ Create a source that wraps a Java 8 `java.util.stream.Stream`.
## Signature
@apidoc[StreamConverters](StreamConverters$) { scala="#fromJavaStream[T,S%3C:java.util.stream.BaseStream[T,S]](stream:()=%3Ejava.util.stream.BaseStream[T,S]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#fromJavaStream(akka.japi.function.Creator)" }
@apidoc[StreamConverters](StreamConverters$) { scala="#fromJavaStream%5BT,S%3C:java.util.stream.BaseStream[T,S]](stream:()=%3Ejava.util.stream.BaseStream[T,S]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#fromJavaStream(akka.japi.function.Creator)" }
## Example
@ -17,4 +17,3 @@ Scala
Java
: @@snip [StreamConvertersToJava.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #fromJavaStream }

View file

@ -4,14 +4,34 @@ Create a sink that wraps an `OutputStream`.
@ref[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala) { #fromOutputStream }
@@@
@apidoc[StreamConverters.fromOutputStream](StreamConverters$) { scala="#fromOutputStream(out:()=%3Cjava.io.OutputStream,autoFlush:Boolean):akka.stream.scaladsl.Sink[akka.util.ByteString,scala.concurrent.Future[akka.stream.IOResult]]" java="#fromOutputStream(akka.japi.function.Creator)" }
## Description
TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646
Creates a Sink which writes incoming @apidoc[akka.util.ByteString]s to a `java.io.OutputStream` created by the given function.
Materializes a @java[`CompletionStage`]@scala[`Future`] of `IOResult` that will be completed with the size of the file (in bytes) on completion,
and a possible exception if IO operation was not completed successfully.
You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
set it for a given Source by using `akka.stream.ActorAttributes`.
If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false.
The `OutputStream` will be closed when the stream flowing into this `Sink` is completed. The `Sink`
will cancel the stream when the `OutputStream` is no longer writable.
See also @ref:[fromInputStream](fromInputStream.md)
## Example
Here is an example using both `fromInputStream` and `fromOutputStream` to read from a `java.io.InputStream`,
uppercase the read content and write back out into a `java.io.OutputStream`.
Scala
: @@snip [ToFromJavaIOStreams.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala) { #tofromJavaIOStream }
Java
: @@snip [ToFromJavaIOStreams.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java) { #tofromJavaIOStream }

View file

@ -0,0 +1,83 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.converters;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.IOResult;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters;
import akka.testkit.javadsl.TestKit;
import akka.util.ByteString;
import jdocs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.Charset;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
public class ToFromJavaIOStreams extends AbstractJavaTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("ToFromJavaIOStreams");
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void demonstrateFromJavaIOStreams()
throws InterruptedException, ExecutionException, TimeoutException {
Charset charset = Charset.defaultCharset();
byte[] bytes = "Some random input".getBytes(charset);
Flow<ByteString, ByteString, NotUsed> toUpperCase =
Flow.<ByteString>create()
.map(
bs -> {
String str = bs.decodeString(charset).toUpperCase();
return ByteString.fromString(str, charset);
});
// #tofromJavaIOStream
java.io.InputStream inputStream = new ByteArrayInputStream(bytes);
Source<ByteString, CompletionStage<IOResult>> source =
StreamConverters.fromInputStream(() -> inputStream);
// Given a ByteString produces a ByteString with the upperCase representation
// Removed from the sample for brevity.
// Flow<ByteString, ByteString, NotUsed> toUpperCase = ...
java.io.OutputStream outputStream = new ByteArrayOutputStream();
Sink<ByteString, CompletionStage<IOResult>> sink =
StreamConverters.fromOutputStream(() -> outputStream);
CompletionStage<IOResult> ioResultCompletionStage =
source.via(toUpperCase).runWith(sink, system);
// When the ioResultCompletionStage completes, the byte array backing the outputStream
// will contain the uppercase representation of the bytes read from the inputStream.
// #tofromJavaIOStream
ioResultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
assertEquals(
"SOME RANDOM INPUT",
new String(((ByteArrayOutputStream) outputStream).toByteArray(), charset));
}
}

View file

@ -0,0 +1,48 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.converters
// #import
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import akka.NotUsed
import akka.stream.IOResult
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.StreamConverters
import akka.util.ByteString
// #import
import akka.testkit.AkkaSpec
import org.scalatest.concurrent.Futures
import scala.concurrent.Future
class ToFromJavaIOStreams extends AkkaSpec with Futures {
"demonstrate conversion from java.io.streams" in {
//#tofromJavaIOStream
val bytes = "Some random input".getBytes
val inputStream = new ByteArrayInputStream(bytes)
val outputStream = new ByteArrayOutputStream()
val source: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream(() => inputStream)
val toUpperCase: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(_.map(_.toChar.toUpper.toByte))
val sink: Sink[ByteString, Future[IOResult]] = StreamConverters.fromOutputStream(() => outputStream)
val eventualResult = source.via(toUpperCase).runWith(sink)
//#tofromJavaIOStream
whenReady(eventualResult) { _ =>
outputStream.toByteArray.map(_.toChar).mkString should be("SOME RANDOM INPUT")
}
}
}

View file

@ -121,7 +121,7 @@ object StreamConverters {
/**
* Creates a Source from an [[java.io.InputStream]] created by the given function.
* Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] elements.
* The actual size of emitted elements depends how much data the underlying
* The actual size of the emitted elements depends on how much data the underlying
* [[java.io.InputStream]] returns on each read invocation. Such chunks will
* never be larger than chunkSize though.
*
@ -142,7 +142,7 @@ object StreamConverters {
/**
* Creates a Source from an [[java.io.InputStream]] created by the given function.
* Emitted elements are up to 8192 bytes sized [[akka.util.ByteString]] elements.
* The actual size of emitted elements depends how much data the underlying
* The actual size of the emitted elements depends on how much data the underlying
* [[java.io.InputStream]] returns on each read invocation. Such chunks will
* never be larger than chunkSize though.
*

View file

@ -27,7 +27,7 @@ object StreamConverters {
/**
* Creates a Source from an [[InputStream]] created by the given function.
* Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] elements.
* The actual size of emitted elements depends how much data the underlying
* The actual size of the emitted elements depends on how much data the underlying
* [[java.io.InputStream]] returns on each read invocation. Such chunks will
* never be larger than chunkSize though.
*