Add examples for asInputStream and asOutputStream (#29830)
This commit is contained in:
parent
385e075fdc
commit
e290130386
4 changed files with 100 additions and 7 deletions
|
|
@ -25,3 +25,11 @@ The `InputStream` will be ended when the stream flowing into this `Sink` complet
|
||||||
**backpressures** when no read is pending on the `InputStream`
|
**backpressures** when no read is pending on the `InputStream`
|
||||||
@@@
|
@@@
|
||||||
|
|
||||||
|
## Example
|
||||||
|
Here is an example of a @apidoc[Sink] that reads the contents from the source, converts it into uppercase and materializes into a @javadoc[java.io.InputStream](java.io.InputStream)
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ToFromJavaIOStreams.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala) { #asJavaInputStream }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ToFromJavaIOStreams.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java) { #asJavaInputStream }
|
||||||
|
|
|
||||||
|
|
@ -24,3 +24,11 @@ closing the `OutputStream` will complete the `Source`.
|
||||||
**completes** when the `OutputStream` is closed
|
**completes** when the `OutputStream` is closed
|
||||||
@@@
|
@@@
|
||||||
|
|
||||||
|
## Example
|
||||||
|
Here is an example of a @apidoc[Source] that materializes into a @javadoc[java.io.OutputStream](java.io.OutputStream), and is connected to a Sink which concatenates the incoming @apidoc[akka.util.ByteString]s
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ToFromJavaIOStreams.scala](/akka-docs/src/test/scala/docs/stream/operators/converters/ToFromJavaIOStreams.scala) { #asJavaOutputStream }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ToFromJavaIOStreams.java](/akka-docs/src/test/java/jdocs/stream/operators/converters/ToFromJavaIOStreams.java) { #asJavaOutputStream }
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,10 @@ package jdocs.stream.operators.converters;
|
||||||
|
|
||||||
import akka.NotUsed;
|
import akka.NotUsed;
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.japi.Pair;
|
||||||
import akka.stream.IOResult;
|
import akka.stream.IOResult;
|
||||||
import akka.stream.Materializer;
|
|
||||||
import akka.stream.javadsl.Flow;
|
import akka.stream.javadsl.Flow;
|
||||||
|
import akka.stream.javadsl.Keep;
|
||||||
import akka.stream.javadsl.Sink;
|
import akka.stream.javadsl.Sink;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
import akka.stream.javadsl.StreamConverters;
|
import akka.stream.javadsl.StreamConverters;
|
||||||
|
|
@ -21,12 +22,16 @@ import org.junit.Test;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import static akka.util.ByteString.emptyByteString;
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class ToFromJavaIOStreams extends AbstractJavaTest {
|
public class ToFromJavaIOStreams extends AbstractJavaTest {
|
||||||
|
|
@ -80,4 +85,51 @@ public class ToFromJavaIOStreams extends AbstractJavaTest {
|
||||||
"SOME RANDOM INPUT",
|
"SOME RANDOM INPUT",
|
||||||
new String(((ByteArrayOutputStream) outputStream).toByteArray(), charset));
|
new String(((ByteArrayOutputStream) outputStream).toByteArray(), charset));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void demonstrateAsJavaInputStream() throws Exception {
|
||||||
|
|
||||||
|
// #asJavaInputStream
|
||||||
|
Charset charset = Charset.defaultCharset();
|
||||||
|
Flow<ByteString, ByteString, NotUsed> toUpperCase =
|
||||||
|
Flow.<ByteString>create()
|
||||||
|
.map(
|
||||||
|
bs -> {
|
||||||
|
String str = bs.decodeString(charset).toUpperCase();
|
||||||
|
return ByteString.fromString(str, charset);
|
||||||
|
});
|
||||||
|
|
||||||
|
final Sink<ByteString, InputStream> sink = StreamConverters.asInputStream();
|
||||||
|
final InputStream stream =
|
||||||
|
Source.single(ByteString.fromString("Some random input"))
|
||||||
|
.via(toUpperCase)
|
||||||
|
.runWith(sink, system);
|
||||||
|
|
||||||
|
// #asJavaInputStream
|
||||||
|
byte[] a = new byte[17];
|
||||||
|
stream.read(a);
|
||||||
|
assertArrayEquals("SOME RANDOM INPUT".getBytes(), a);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void demonstrateAsJavaOutputStream() throws Exception {
|
||||||
|
|
||||||
|
// #asJavaOutputStream
|
||||||
|
final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream();
|
||||||
|
final Sink<ByteString, CompletionStage<ByteString>> sink =
|
||||||
|
Sink.fold(emptyByteString(), (ByteString arg1, ByteString arg2) -> arg1.concat(arg2));
|
||||||
|
|
||||||
|
final Pair<OutputStream, CompletionStage<ByteString>> output =
|
||||||
|
source.toMat(sink, Keep.both()).run(system);
|
||||||
|
|
||||||
|
// #asJavaOutputStream
|
||||||
|
byte[] bytesArray = new byte[3];
|
||||||
|
output.first().write(bytesArray);
|
||||||
|
output.first().close();
|
||||||
|
|
||||||
|
final byte[] expected =
|
||||||
|
output.second().toCompletableFuture().get(5, TimeUnit.SECONDS).toArray();
|
||||||
|
|
||||||
|
assertArrayEquals(expected, bytesArray);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,16 +5,14 @@
|
||||||
package docs.stream.operators.converters
|
package docs.stream.operators.converters
|
||||||
|
|
||||||
// #import
|
// #import
|
||||||
import java.io.ByteArrayInputStream
|
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream }
|
||||||
import java.io.ByteArrayOutputStream
|
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.stream.IOResult
|
import akka.stream.IOResult
|
||||||
import akka.stream.scaladsl.Flow
|
import akka.stream.scaladsl.{ Flow, Keep, Sink, Source, StreamConverters }
|
||||||
import akka.stream.scaladsl.Sink
|
|
||||||
import akka.stream.scaladsl.Source
|
|
||||||
import akka.stream.scaladsl.StreamConverters
|
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
|
|
||||||
|
import scala.util.Random
|
||||||
// #import
|
// #import
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import org.scalatest.concurrent.Futures
|
import org.scalatest.concurrent.Futures
|
||||||
|
|
@ -45,4 +43,31 @@ class ToFromJavaIOStreams extends AkkaSpec with Futures {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"demonstrate usage as java.io.InputStream" in {
|
||||||
|
//#asJavaInputStream
|
||||||
|
val toUpperCase: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].map(_.map(_.toChar.toUpper.toByte))
|
||||||
|
val source: Source[ByteString, NotUsed] = Source.single(ByteString("some random input"))
|
||||||
|
val sink: Sink[ByteString, InputStream] = StreamConverters.asInputStream()
|
||||||
|
|
||||||
|
val inputStream: InputStream = source.via(toUpperCase).runWith(sink)
|
||||||
|
//#asJavaInputStream
|
||||||
|
inputStream.read() should be('S')
|
||||||
|
inputStream.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
"demonstrate usage as java.io.OutputStream" in {
|
||||||
|
//#asJavaOutputStream
|
||||||
|
val source: Source[ByteString, OutputStream] = StreamConverters.asOutputStream()
|
||||||
|
val sink: Sink[ByteString, Future[ByteString]] = Sink.fold[ByteString, ByteString](ByteString.empty)(_ ++ _)
|
||||||
|
|
||||||
|
val (outputStream, result): (OutputStream, Future[ByteString]) =
|
||||||
|
source.toMat(sink)(Keep.both).run()
|
||||||
|
|
||||||
|
//#asJavaOutputStream
|
||||||
|
val bytesArray = Array.fill[Byte](3)(Random.nextInt(1024).asInstanceOf[Byte])
|
||||||
|
outputStream.write(bytesArray)
|
||||||
|
outputStream.close()
|
||||||
|
result.futureValue should be(ByteString(bytesArray))
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue