From e29013038623f960b6246ee5ecdc466a4a4cf369 Mon Sep 17 00:00:00 2001 From: Nitika Agarwal <54163056+nitikagarw@users.noreply.github.com> Date: Mon, 30 Nov 2020 15:09:14 +0530 Subject: [PATCH] Add examples for asInputStream and asOutputStream (#29830) --- .../StreamConverters/asInputStream.md | 8 +++ .../StreamConverters/asOutputStream.md | 8 +++ .../converters/ToFromJavaIOStreams.java | 54 ++++++++++++++++++- .../converters/ToFromJavaIOStreams.scala | 37 ++++++++++--- 4 files changed, 100 insertions(+), 7 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/StreamConverters/asInputStream.md b/akka-docs/src/main/paradox/stream/operators/StreamConverters/asInputStream.md index 7467230b1c..291cddabbd 100644 --- a/akka-docs/src/main/paradox/stream/operators/StreamConverters/asInputStream.md +++ b/akka-docs/src/main/paradox/stream/operators/StreamConverters/asInputStream.md @@ -25,3 +25,11 @@ The `InputStream` will be ended when the stream flowing into this `Sink` complet **backpressures** when no read is pending on the `InputStream` @@@ +## Example +Here is an example of a @apidoc[Sink] that reads the contents from the source, converts it into uppercase and materializes into a @javadoc[java.io.InputStream](java.io.InputStream) + +Scala +: @@snip [ToFromJavaIOStreams.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala) { #asJavaInputStream } + +Java +: @@snip [ToFromJavaIOStreams.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java) { #asJavaInputStream } diff --git a/akka-docs/src/main/paradox/stream/operators/StreamConverters/asOutputStream.md b/akka-docs/src/main/paradox/stream/operators/StreamConverters/asOutputStream.md index 8385da91b4..c985b445c6 100644 --- a/akka-docs/src/main/paradox/stream/operators/StreamConverters/asOutputStream.md +++ b/akka-docs/src/main/paradox/stream/operators/StreamConverters/asOutputStream.md @@ -24,3 +24,11 @@ closing the `OutputStream` will complete the `Source`. **completes** when the `OutputStream` is closed @@@ +## Example +Here is an example of a @apidoc[Source] that materializes into a @javadoc[java.io.OutputStream](java.io.OutputStream), and is connected to a Sink which concatenates the incoming @apidoc[akka.util.ByteString]s + +Scala +: @@snip [ToFromJavaIOStreams.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala) { #asJavaOutputStream } + +Java +: @@snip [ToFromJavaIOStreams.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java) { #asJavaOutputStream } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java b/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java index c9aa6c538f..683f7a9c2e 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java @@ -6,9 +6,10 @@ package jdocs.stream.operators.converters; import akka.NotUsed; import akka.actor.ActorSystem; +import akka.japi.Pair; import akka.stream.IOResult; -import akka.stream.Materializer; import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.stream.javadsl.StreamConverters; @@ -21,12 +22,16 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.Charset; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static akka.util.ByteString.emptyByteString; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; public class ToFromJavaIOStreams extends AbstractJavaTest { @@ -80,4 +85,51 @@ public class ToFromJavaIOStreams extends AbstractJavaTest { "SOME RANDOM INPUT", new String(((ByteArrayOutputStream) outputStream).toByteArray(), charset)); } + + @Test + public void demonstrateAsJavaInputStream() throws Exception { + + // #asJavaInputStream + Charset charset = Charset.defaultCharset(); + Flow toUpperCase = + Flow.create() + .map( + bs -> { + String str = bs.decodeString(charset).toUpperCase(); + return ByteString.fromString(str, charset); + }); + + final Sink sink = StreamConverters.asInputStream(); + final InputStream stream = + Source.single(ByteString.fromString("Some random input")) + .via(toUpperCase) + .runWith(sink, system); + + // #asJavaInputStream + byte[] a = new byte[17]; + stream.read(a); + assertArrayEquals("SOME RANDOM INPUT".getBytes(), a); + } + + @Test + public void demonstrateAsJavaOutputStream() throws Exception { + + // #asJavaOutputStream + final Source source = StreamConverters.asOutputStream(); + final Sink> sink = + Sink.fold(emptyByteString(), (ByteString arg1, ByteString arg2) -> arg1.concat(arg2)); + + final Pair> output = + source.toMat(sink, Keep.both()).run(system); + + // #asJavaOutputStream + byte[] bytesArray = new byte[3]; + output.first().write(bytesArray); + output.first().close(); + + final byte[] expected = + output.second().toCompletableFuture().get(5, TimeUnit.SECONDS).toArray(); + + assertArrayEquals(expected, bytesArray); + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala b/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala index 831ab3f146..9fd600dff7 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala @@ -5,16 +5,14 @@ package docs.stream.operators.converters // #import -import java.io.ByteArrayInputStream -import java.io.ByteArrayOutputStream +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream } import akka.NotUsed import akka.stream.IOResult -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.stream.scaladsl.StreamConverters +import akka.stream.scaladsl.{ Flow, Keep, Sink, Source, StreamConverters } import akka.util.ByteString + +import scala.util.Random // #import import akka.testkit.AkkaSpec import org.scalatest.concurrent.Futures @@ -45,4 +43,31 @@ class ToFromJavaIOStreams extends AkkaSpec with Futures { } + "demonstrate usage as java.io.InputStream" in { + //#asJavaInputStream + val toUpperCase: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(_.map(_.toChar.toUpper.toByte)) + val source: Source[ByteString, NotUsed] = Source.single(ByteString("some random input")) + val sink: Sink[ByteString, InputStream] = StreamConverters.asInputStream() + + val inputStream: InputStream = source.via(toUpperCase).runWith(sink) + //#asJavaInputStream + inputStream.read() should be('S') + inputStream.close() + } + + "demonstrate usage as java.io.OutputStream" in { + //#asJavaOutputStream + val source: Source[ByteString, OutputStream] = StreamConverters.asOutputStream() + val sink: Sink[ByteString, Future[ByteString]] = Sink.fold[ByteString, ByteString](ByteString.empty)(_ ++ _) + + val (outputStream, result): (OutputStream, Future[ByteString]) = + source.toMat(sink)(Keep.both).run() + + //#asJavaOutputStream + val bytesArray = Array.fill[Byte](3)(Random.nextInt(1024).asInstanceOf[Byte]) + outputStream.write(bytesArray) + outputStream.close() + result.futureValue should be(ByteString(bytesArray)) + } + }