diff --git a/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromInputStream.md b/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromInputStream.md index 5b948ac8dc..b475bf0f06 100644 --- a/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromInputStream.md +++ b/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromInputStream.md @@ -4,14 +4,38 @@ Create a source that wraps an `InputStream`. @ref[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters) -@@@div { .group-scala } - ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala) { #fromInputStream } - -@@@ +@apidoc[StreamConverters.fromInputStream](StreamConverters$) { scala="#fromInputStream(in:()=%3Ejava.io.InputStream,chunkSize:Int):akka.stream.scaladsl.Source[akka.util.ByteString,scala.concurrent.Future[akka.stream.IOResult]]" java="#fromInputStream(akka.japi.function.Creator)" } ## Description -TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646 +Creates a Source from a `java.io.InputStream` created by the given function. Emitted elements are up to `chunkSize` +sized @apidoc[akka.util.ByteString]s elements. The actual size of the emitted elements depends on how much data the +underlying `java.io.InputStream` returns on each read invocation. Such chunks will never be larger +than `chunkSize` though. + +You can configure the default dispatcher for this Source by changing +the `akka.stream.materializer.blocking-io-dispatcher` or set it for a given Source by +using `akka.stream.ActorAttributes`. + +It materializes a @java[`CompletionStage`]@scala[`Future`] of `IOResult` containing the number of bytes read from the source file +upon completion, and a possible exception if IO operation was not completed successfully. Note that bytes having +been read by the source does not give any guarantee that the bytes were seen by downstream stages. + +The created `InputStream` will be closed when the `Source` is cancelled. + +See also @ref:[fromOutputStream](fromOutputStream.md) + + +## Example + +Here is an example using both `fromInputStream` and `fromOutputStream` to read from a `java.io.InputStream`, +uppercase the read content and write back out into a `java.io.OutputStream`. + +Scala +: @@snip [ToFromJavaIOStreams.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala) { #tofromJavaIOStream } + +Java +: @@snip [ToFromJavaIOStreams.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java) { #tofromJavaIOStream } + diff --git a/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromJavaStream.md b/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromJavaStream.md index e76494a0ab..170aabdf08 100644 --- a/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromJavaStream.md +++ b/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromJavaStream.md @@ -6,7 +6,7 @@ Create a source that wraps a Java 8 `java.util.stream.Stream`. ## Signature -@apidoc[StreamConverters](StreamConverters$) { scala="#fromJavaStream[T,S%3C:java.util.stream.BaseStream[T,S]](stream:()=%3Ejava.util.stream.BaseStream[T,S]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#fromJavaStream(akka.japi.function.Creator)" } +@apidoc[StreamConverters](StreamConverters$) { scala="#fromJavaStream%5BT,S%3C:java.util.stream.BaseStream[T,S]](stream:()=%3Ejava.util.stream.BaseStream[T,S]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#fromJavaStream(akka.japi.function.Creator)" } ## Example @@ -17,4 +17,3 @@ Scala Java : @@snip [StreamConvertersToJava.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/StreamConvertersToJava.java) { #import #fromJavaStream } - diff --git a/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromOutputStream.md b/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromOutputStream.md index cfd80de7da..3ac7e1fb60 100644 --- a/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromOutputStream.md +++ b/akka-docs/src/main/paradox/stream/operators/StreamConverters/fromOutputStream.md @@ -4,14 +4,34 @@ Create a sink that wraps an `OutputStream`. @ref[Additional Sink and Source converters](../index.md#additional-sink-and-source-converters) -@@@div { .group-scala } - ## Signature -@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala) { #fromOutputStream } - -@@@ +@apidoc[StreamConverters.fromOutputStream](StreamConverters$) { scala="#fromOutputStream(out:()=%3Cjava.io.OutputStream,autoFlush:Boolean):akka.stream.scaladsl.Sink[akka.util.ByteString,scala.concurrent.Future[akka.stream.IOResult]]" java="#fromOutputStream(akka.japi.function.Creator)" } ## Description -TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646 +Creates a Sink which writes incoming @apidoc[akka.util.ByteString]s to a `java.io.OutputStream` created by the given function. + +Materializes a @java[`CompletionStage`]@scala[`Future`] of `IOResult` that will be completed with the size of the file (in bytes) on completion, +and a possible exception if IO operation was not completed successfully. + +You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or +set it for a given Source by using `akka.stream.ActorAttributes`. + +If `autoFlush` is true the OutputStream will be flushed whenever a byte array is written, defaults to false. +The `OutputStream` will be closed when the stream flowing into this `Sink` is completed. The `Sink` +will cancel the stream when the `OutputStream` is no longer writable. + +See also @ref:[fromInputStream](fromInputStream.md) + +## Example + +Here is an example using both `fromInputStream` and `fromOutputStream` to read from a `java.io.InputStream`, +uppercase the read content and write back out into a `java.io.OutputStream`. + +Scala +: @@snip [ToFromJavaIOStreams.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala) { #tofromJavaIOStream } + +Java +: @@snip [ToFromJavaIOStreams.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java) { #tofromJavaIOStream } + diff --git a/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java b/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java new file mode 100644 index 0000000000..c9aa6c538f --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.converters; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.IOResult; +import akka.stream.Materializer; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.StreamConverters; +import akka.testkit.javadsl.TestKit; +import akka.util.ByteString; +import jdocs.AbstractJavaTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.charset.Charset; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; + +public class ToFromJavaIOStreams extends AbstractJavaTest { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("ToFromJavaIOStreams"); + } + + @AfterClass + public static void tearDown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + @Test + public void demonstrateFromJavaIOStreams() + throws InterruptedException, ExecutionException, TimeoutException { + Charset charset = Charset.defaultCharset(); + byte[] bytes = "Some random input".getBytes(charset); + Flow toUpperCase = + Flow.create() + .map( + bs -> { + String str = bs.decodeString(charset).toUpperCase(); + return ByteString.fromString(str, charset); + }); + + // #tofromJavaIOStream + java.io.InputStream inputStream = new ByteArrayInputStream(bytes); + Source> source = + StreamConverters.fromInputStream(() -> inputStream); + + // Given a ByteString produces a ByteString with the upperCase representation + // Removed from the sample for brevity. + // Flow toUpperCase = ... + + java.io.OutputStream outputStream = new ByteArrayOutputStream(); + Sink> sink = + StreamConverters.fromOutputStream(() -> outputStream); + + CompletionStage ioResultCompletionStage = + source.via(toUpperCase).runWith(sink, system); + // When the ioResultCompletionStage completes, the byte array backing the outputStream + // will contain the uppercase representation of the bytes read from the inputStream. + // #tofromJavaIOStream + ioResultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS); + assertEquals( + "SOME RANDOM INPUT", + new String(((ByteArrayOutputStream) outputStream).toByteArray(), charset)); + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala b/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala new file mode 100644 index 0000000000..831ab3f146 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.converters + +// #import +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream + +import akka.NotUsed +import akka.stream.IOResult +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.StreamConverters +import akka.util.ByteString +// #import +import akka.testkit.AkkaSpec +import org.scalatest.concurrent.Futures + +import scala.concurrent.Future + +class ToFromJavaIOStreams extends AkkaSpec with Futures { + + "demonstrate conversion from java.io.streams" in { + + //#tofromJavaIOStream + val bytes = "Some random input".getBytes + val inputStream = new ByteArrayInputStream(bytes) + val outputStream = new ByteArrayOutputStream() + + val source: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream(() => inputStream) + + val toUpperCase: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(_.map(_.toChar.toUpper.toByte)) + + val sink: Sink[ByteString, Future[IOResult]] = StreamConverters.fromOutputStream(() => outputStream) + + val eventualResult = source.via(toUpperCase).runWith(sink) + + //#tofromJavaIOStream + whenReady(eventualResult) { _ => + outputStream.toByteArray.map(_.toChar).mkString should be("SOME RANDOM INPUT") + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index 8f51ea5e13..06285322b5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -121,7 +121,7 @@ object StreamConverters { /** * Creates a Source from an [[java.io.InputStream]] created by the given function. * Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] elements. - * The actual size of emitted elements depends how much data the underlying + * The actual size of the emitted elements depends on how much data the underlying * [[java.io.InputStream]] returns on each read invocation. Such chunks will * never be larger than chunkSize though. * @@ -142,7 +142,7 @@ object StreamConverters { /** * Creates a Source from an [[java.io.InputStream]] created by the given function. * Emitted elements are up to 8192 bytes sized [[akka.util.ByteString]] elements. - * The actual size of emitted elements depends how much data the underlying + * The actual size of the emitted elements depends on how much data the underlying * [[java.io.InputStream]] returns on each read invocation. Such chunks will * never be larger than chunkSize though. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 560a085a21..6fc893c080 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -27,7 +27,7 @@ object StreamConverters { /** * Creates a Source from an [[InputStream]] created by the given function. * Emitted elements are up to `chunkSize` sized [[akka.util.ByteString]] elements. - * The actual size of emitted elements depends how much data the underlying + * The actual size of the emitted elements depends on how much data the underlying * [[java.io.InputStream]] returns on each read invocation. Such chunks will * never be larger than chunkSize though. *