From 8780ba28a4712d9a9f4f327fe574fbc8fd349dba Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 17 Nov 2015 13:17:30 +0100 Subject: [PATCH] !str - Moving the InputStream and OutputStream utilities into Source and Sink --- .../akka/stream/io/FileSourcesBenchmark.scala | 2 +- .../rst/java/code/docs/MigrationsJava.java | 54 +++++++++++ .../rst/java/migration-guide-1.0-2.x-java.rst | 93 ++++++++++++++++++- .../rst/scala/code/docs/MigrationsScala.scala | 25 ++++- .../scala/migration-guide-1.0-2.x-scala.rst | 69 +++++++++++++- .../server/StreamingResponseSpecs.scala | 2 +- .../FileAndResourceDirectives.scala | 3 +- .../akka/stream/io/InputStreamSinkTest.java | 2 +- .../stream/io/OutputStreamSourceTest.java | 2 +- .../akka/stream/io/InputStreamSinkSpec.scala | 16 ++-- .../stream/io/InputStreamSourceSpec.scala | 4 +- .../akka/stream/io/OutputStreamSinkSpec.scala | 8 +- .../stream/io/OutputStreamSourceSpec.scala | 14 +-- .../main/scala/akka/stream/impl/Stages.scala | 8 +- .../stream/impl/io/InputStreamSinkStage.scala | 8 +- .../impl/io/OutputStreamSourceStage.scala | 8 +- .../main/scala/akka/stream/io/Implicits.scala | 41 -------- .../akka/stream/io/InputStreamSink.scala | 57 ------------ .../akka/stream/io/InputStreamSource.scala | 53 ----------- .../akka/stream/io/OutputStreamSink.scala | 44 --------- .../akka/stream/io/OutputStreamSource.scala | 60 ------------ .../main/scala/akka/stream/javadsl/Sink.scala | 47 +++++++++- .../scala/akka/stream/javadsl/Source.scala | 56 ++++++++++- .../scala/akka/stream/scaladsl/Sink.scala | 29 +++++- .../scala/akka/stream/scaladsl/Source.scala | 37 +++++++- 25 files changed, 432 insertions(+), 310 deletions(-) delete mode 100644 akka-stream/src/main/scala/akka/stream/io/Implicits.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/io/InputStreamSink.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/io/OutputStreamSource.scala diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala index 6277e0d02c..319e1d566a 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala @@ -52,7 +52,7 @@ class FileSourcesBenchmark { @Setup def setup() { fileChannelSource = Source.file(file, bufSize) - fileInputStreamSource = InputStreamSource(() ⇒ new FileInputStream(file), bufSize) + fileInputStreamSource = Source.inputStream(() ⇒ new FileInputStream(file), bufSize) ioSourceLinesIterator = Source(() ⇒ scala.io.Source.fromFile(file).getLines()).map(ByteString(_)) } diff --git a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java index 8200b9e549..c9444e6e54 100644 --- a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java +++ b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java @@ -2,6 +2,7 @@ package docs; import akka.actor.Cancellable; import akka.http.javadsl.model.Uri; +import akka.japi.function.Creator; import akka.japi.Pair; import akka.japi.function.Function; import akka.stream.*; @@ -14,6 +15,9 @@ import scala.concurrent.Promise; import scala.runtime.BoxedUnit; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.concurrent.TimeUnit; import java.nio.charset.Charset; @@ -22,6 +26,15 @@ public class MigrationsJava { // This is compile-only code, no need for actually running anything. public static ActorMaterializer mat = null; + public static class SomeInputStream extends InputStream { + public SomeInputStream() {} + @Override public int read() throws IOException { return 0; } + } + + public static class SomeOutputStream extends OutputStream { + @Override public void write(int b) throws IOException { return; } + } + public static void main(String[] args) { Outlet outlet = null; @@ -160,6 +173,47 @@ public class MigrationsJava { final Sink> fileSink = Sink.file(new File(".")); //#file-source-sink + + //#input-output-stream-source-sink + final Source> inputStreamSrc = + Source.inputStream(new Creator(){ + public InputStream create() { + return new SomeInputStream(); + } + }); + + final Source> otherInputStreamSrc = + Source.inputStream(new Creator(){ + public InputStream create() { + return new SomeInputStream(); + } + }, 1024); + + final Sink> outputStreamSink = + Sink.outputStream(new Creator(){ + public OutputStream create() { + return new SomeOutputStream(); + } + }); + //#input-output-stream-source-sink + + + //#output-input-stream-source-sink + final FiniteDuration timeout = FiniteDuration.Zero(); + + final Source outputStreamSrc = + Source.outputStream(); + + final Source otherOutputStreamSrc = + Source.outputStream(timeout); + + final Sink someInputStreamSink = + Sink.inputStream(); + + final Sink someOtherInputStreamSink = + Sink.inputStream(timeout); + //#output-input-stream-source-sink + } } diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index 4b847422d9..5273332bed 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -424,17 +424,17 @@ should be replaced by: SynchronousFileSource and SynchronousFileSink ============================================ -Both have been replaced by `Source.file(…)` and `Sink.file(…)` due to discoverability issues +Both have been replaced by ``Source.file(…)`` and ``Sink.file(…)`` due to discoverability issues paired with names which leaked internal implementation details. Update procedure ---------------- -Replace `SynchronousFileSource.create(` with `Source.file(` +Replace ``SynchronousFileSource.create(`` with ``Source.file(`` -Replace `SynchronousFileSink.create(` with `Sink.file(` +Replace ``SynchronousFileSink.create(`` with ``Sink.file(`` -Replace `SynchronousFileSink.appendTo(f)` with `Sink.file(f, true)` +Replace ``SynchronousFileSink.appendTo(f)`` with ``Sink.file(f, true)`` Example ^^^^^^^ @@ -455,4 +455,87 @@ Example should be replaced by -.. includecode:: code/docs/MigrationsJava.java#file-source-sink \ No newline at end of file +.. includecode:: code/docs/MigrationsJava.java#file-source-sink + +InputStreamSource and OutputStreamSink +====================================== + +Both have been replaced by ``Source.inputStream(…)`` and ``Sink.outputStream(…)`` due to discoverability issues. + +Update procedure +---------------- + +Replace ``InputStreamSource.create(`` with ``Source.inputStream(`` + +Replace ``OutputStreamSink.create(`` with ``Sink.outputStream(`` + +Example +^^^^^^^ + +:: + + // This no longer works! + final Source> inputStreamSrc = + InputStreamSource.create(new Creator(){ + public InputStream create() { + return new SomeInputStream(); + } + }); + + // This no longer works! + final Source> otherInputStreamSrc = + InputStreamSource.create(new Creator(){ + public InputStream create() { + return new SomeInputStream(); + } + }, 1024); + + // This no longer works! + final Sink> outputStreamSink = + OutputStreamSink.create(new Creator(){ + public OutputStream create() { + return new SomeOutputStream(); + } + }) + +should be replaced by + +.. includecode:: code/docs/MigrationsJava.java#input-output-stream-source-sink + + +OutputStreamSource and InputStreamSink +====================================== + +Both have been replaced by ``Source.outputStream(…)`` and ``Sink.inputStream(…)`` due to discoverability issues. + +Update procedure +---------------- + +Replace ``OutputStreamSource.create(`` with ``Source.outputStream(`` + +Replace ``InputStreamSink.create(`` with ``Sink.inputStream(`` + +Example +^^^^^^^ + +:: + + // This no longer works! + final Source outputStreamSrc = + OutputStreamSource.create(); + + // This no longer works! + final Source otherOutputStreamSrc = + OutputStreamSource.create(timeout); + + // This no longer works! + final Sink someInputStreamSink = + InputStreamSink.create(); + + // This no longer works! + final Sink someOtherInputStreamSink = + InputStreamSink.create(timeout); + +should be replaced by + +.. includecode:: code/docs/MigrationsJava.java#output-input-stream-source-sink \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala index b97b0f24a8..1b157e5b85 100644 --- a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala +++ b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala @@ -1,6 +1,6 @@ package docs -import java.io.File +import java.io.{ InputStream, File } import akka.http.scaladsl.model.Uri import akka.stream.scaladsl._ @@ -218,6 +218,29 @@ class MigrationsScala extends AkkaSpec { val someFileSink = Sink.file(new File(".")) //#file-source-sink + + class SomeInputStream extends java.io.InputStream { override def read(): Int = 0 } + class SomeOutputStream extends java.io.OutputStream { override def write(b: Int): Unit = () } + + //#input-output-stream-source-sink + val inputStreamSrc = Source.inputStream(() => new SomeInputStream()) + + val otherInputStreamSrc = Source.inputStream(() => new SomeInputStream()) + + val someOutputStreamSink = Sink.outputStream(() => new SomeOutputStream()) + //#input-output-stream-source-sink + + //#output-input-stream-source-sink + val timeout: FiniteDuration = 0.seconds + + val outputStreamSrc = Source.outputStream() + + val otherOutputStreamSrc = Source.outputStream(timeout) + + val someInputStreamSink = Sink.inputStream() + + val someOtherInputStreamSink = Sink.inputStream(timeout) + //#output-input-stream-source-sink } } } diff --git a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst index f123e8e622..40ac6c43e5 100644 --- a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst @@ -454,15 +454,15 @@ should be replaced by: SynchronousFileSource and SynchronousFileSink ============================================ -Both have been replaced by `Source.file(…)` and `Sink.file(…)` due to discoverability issues +Both have been replaced by ``Source.file(…)`` and ``Sink.file(…)`` due to discoverability issues paired with names which leaked internal implementation details. Update procedure ---------------- -Replace `SynchronousFileSource(` and `SynchronousFileSource.apply(` with `Source.file(` +Replace ``SynchronousFileSource(`` and ``SynchronousFileSource.apply(`` with ``Source.file(`` -Replace `SynchronousFileSink(` and `SynchronousFileSink.apply(` with `Sink.file(` +Replace ``SynchronousFileSink(`` and ``SynchronousFileSink.apply(`` with ``Sink.file(`` Example ^^^^^^^ @@ -481,3 +481,66 @@ Example should be replaced by .. includecode:: code/docs/MigrationsScala.scala#file-source-sink + +InputStreamSource and OutputStreamSink +============================================ + +Both have been replaced by ``Source.inputStream(…)`` and ``Sink.outputStream(…)`` due to discoverability issues. + +Update procedure +---------------- + +Replace ``InputStreamSource(`` and ``InputStreamSource.apply(`` with ``Source.inputStream(`` + +Replace ``OutputStreamSink(`` and ``OutputStreamSink.apply(`` with ``Sink.outputStream(`` + +Example +^^^^^^^ + +:: + + // This no longer works! + val inputStreamSrc = InputStreamSource(() => new SomeInputStream()) + + // This no longer works! + val otherInputStreamSrc = InputStreamSource(() => new SomeInputStream(), 1024) + + // This no longer works! + val someOutputStreamSink = OutputStreamSink(() => new SomeOutputStream()) + +should be replaced by + +.. includecode:: code/docs/MigrationsScala.scala#input-output-stream-source-sink + +OutputStreamSource and InputStreamSink +====================================== + +Both have been replaced by ``Source.outputStream(…)`` and ``Sink.inputStream(…)`` due to discoverability issues. + +Update procedure +---------------- + +Replace ``OutputStreamSource(`` and ``OutputStreamSource.apply(`` with ``Source.outputStream(`` + +Replace ``InputStreamSink(`` and ``InputStreamSink.apply(`` with ``Sink.inputStream(`` + +Example +^^^^^^^ + +:: + + // This no longer works! + val outputStreamSrc = OutputStreamSource() + + // This no longer works! + val otherOutputStreamSrc = OutputStreamSource(timeout) + + // This no longer works! + val someInputStreamSink = InputStreamSink() + + // This no longer works! + val someOtherInputStreamSink = InputStreamSink(timeout); + +should be replaced by + +.. includecode:: code/docs/MigrationsScala.scala#output-input-stream-source-sink \ No newline at end of file diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala index c5b5c19f45..b5c7a5d37e 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala @@ -21,7 +21,7 @@ class StreamingResponseSpecs extends RoutingSpec { Get() ~> route ~> check { status should ===(StatusCodes.OK) - responseAs[String] should === ("") + responseAs[String] should ===("") } } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala index d0663088a5..950adf81b0 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala @@ -9,7 +9,6 @@ import java.io.File import java.net.{ URI, URL } import akka.stream.ActorAttributes -import akka.stream.io.{ InputStreamSource } import akka.stream.scaladsl.Source import scala.annotation.tailrec @@ -93,7 +92,7 @@ trait FileAndResourceDirectives { extractSettings { settings ⇒ complete { HttpEntity.Default(contentType, length, - InputStreamSource(() ⇒ url.openStream()) + Source.inputStream(() ⇒ url.openStream()) .withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) } } diff --git a/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java b/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java index 875e9137e7..493f5f6128 100644 --- a/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java @@ -36,7 +36,7 @@ public class InputStreamSinkTest extends StreamTest { public void mustReadEventViaInputStream() throws Exception { final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); - final Sink sink = InputStreamSink.create(timeout); + final Sink sink = Sink.inputStream(timeout); final List list = Collections.singletonList(ByteString.fromString("a")); final InputStream stream = Source.from(list).runWith(sink, materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java index ca5682c054..365240966c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java @@ -40,7 +40,7 @@ public class OutputStreamSourceTest extends StreamTest { final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); final JavaTestKit probe = new JavaTestKit(system); - final Source source = OutputStreamSource.create(timeout); + final Source source = Source.outputStream(timeout); final OutputStream s = source.to(Sink.foreach(new Procedure() { public void apply(ByteString elem) { probe.getRef().tell(elem, ActorRef.noSender()); diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index 396afcebf9..031ecb5675 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -80,7 +80,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "InputStreamSink" must { "read bytes from InputStream" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() probe.sendNext(byteString) val arr = newArray() @@ -113,7 +113,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() val data = randomArray(2) probe.sendNext(ByteString(data)) @@ -126,7 +126,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "block read until get requested number of bytes from upstream" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() val arr = newArray() val f = Future(inputStream.read(arr)) @@ -141,7 +141,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "fill up buffer by default" in assertAllStagesStopped { import system.dispatcher - val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() val array2 = randomArray(3) probe.sendNext(byteString) @@ -162,7 +162,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "throw error when reactive stream is closed" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() probe.sendNext(byteString) inputStream.close() @@ -188,7 +188,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "return -1 when read after stream is completed" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(InputStreamSink())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() probe.sendNext(byteString) val arr = newArray() @@ -229,9 +229,9 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val mat = ActorMaterializer()(sys) try { - TestSource.probe[ByteString].runWith(InputStreamSink())(mat) + TestSource.probe[ByteString].runWith(Sink.inputStream())(mat) mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "InputStreamSink").get + val ref = expectMsgType[Children].children.find(_.path.toString contains "inputStreamSink").get assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") } finally shutdown(sys) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala index e9ace54352..3f23e655d0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala @@ -5,7 +5,7 @@ package akka.stream.io import java.io.InputStream -import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.{ Source, Sink } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } @@ -19,7 +19,7 @@ class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaF "InputStreamSource" must { "read bytes from InputStream" in assertAllStagesStopped { - val f = InputStreamSource(() ⇒ new InputStream { + val f = Source.inputStream(() ⇒ new InputStream { @volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt) override def read(): Int = { buf match { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala index 2ea542c94c..70be176b5c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala @@ -5,7 +5,7 @@ package akka.stream.io import java.io.OutputStream -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Source, Sink } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } @@ -26,7 +26,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val datas = List(ByteString("a"), ByteString("c"), ByteString("c")) val completion = Source(datas) - .runWith(OutputStreamSink(() ⇒ new OutputStream { + .runWith(Sink.outputStream(() ⇒ new OutputStream { override def write(i: Int): Unit = () override def write(bytes: Array[Byte]): Unit = p.ref ! ByteString(bytes).utf8String })) @@ -40,7 +40,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "close underlying stream when error received" in assertAllStagesStopped { val p = TestProbe() Source.failed(new TE("Boom!")) - .runWith(OutputStreamSink(() ⇒ new OutputStream { + .runWith(Sink.outputStream(() ⇒ new OutputStream { override def write(i: Int): Unit = () override def close() = p.ref ! "closed" })) @@ -51,7 +51,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "close underlying stream when completion received" in assertAllStagesStopped { val p = TestProbe() Source.empty - .runWith(OutputStreamSink(() ⇒ new OutputStream { + .runWith(Sink.outputStream(() ⇒ new OutputStream { override def write(i: Int): Unit = () override def write(bytes: Array[Byte]): Unit = p.ref ! ByteString(bytes).utf8String override def close() = p.ref ! "closed" diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala index 3b36ce87fb..6622a4a0df 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala @@ -70,7 +70,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { "OutputStreamSource" must { "read bytes from OutputStream" in assertAllStagesStopped { - val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run + val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run val s = probe.expectSubscription() outputStream.write(bytesArray) @@ -81,7 +81,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "block flush call until send all buffer to downstream" in assertAllStagesStopped { - val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run + val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run val s = probe.expectSubscription() outputStream.write(bytesArray) @@ -99,7 +99,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "not block flushes when buffer is empty" in assertAllStagesStopped { - val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run + val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run val s = probe.expectSubscription() outputStream.write(bytesArray) @@ -117,7 +117,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "block writes when buffer is full" in assertAllStagesStopped { - val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both) + val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both) .withAttributes(Attributes.inputBuffer(16, 16)).run val s = probe.expectSubscription() @@ -138,7 +138,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "throw error when write after stream is closed" in assertAllStagesStopped { - val (outputStream, probe) = OutputStreamSource().toMat(TestSink.probe[ByteString])(Keep.both).run + val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run probe.expectSubscription() outputStream.close() @@ -151,9 +151,9 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val mat = ActorMaterializer()(sys) try { - OutputStreamSource().runWith(TestSink.probe[ByteString])(mat) + Source.outputStream().runWith(TestSink.probe[ByteString])(mat) mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "OutputStreamSource").get + val ref = expectMsgType[Children].children.find(_.path.toString contains "outputStreamSource").get assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") } finally shutdown(sys) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 84f1dc9fec..af9721aad7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -81,9 +81,9 @@ private[stream] object Stages { val subscriberSource = name("subscriberSource") val actorPublisherSource = name("actorPublisherSource") val actorRefSource = name("actorRefSource") - val inputStreamSource = name("inputStreamSource") val acknowledgeSource = name("acknowledgeSource") - val outputStreamSource = name("outputStreamSource") + val inputStreamSource = name("inputStreamSource") and IODispatcher + val outputStreamSource = name("outputStreamSource") and IODispatcher val fileSource = name("fileSource") and IODispatcher val subscriberSink = name("subscriberSink") @@ -95,9 +95,9 @@ private[stream] object Stages { val ignoreSink = name("ignoreSink") val actorRefSink = name("actorRefSink") val actorSubscriberSink = name("actorSubscriberSink") - val outputStreamSink = name("outputStreamSink") val acknowledgeSink = name("acknowledgeSink") - val inputStreamSink = name("inputStreamSink") + val outputStreamSink = name("outputStreamSink") and IODispatcher + val inputStreamSink = name("inputStreamSink") and IODispatcher val fileSink = name("fileSource") and IODispatcher } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index aebeb964fa..109e181018 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -33,7 +33,7 @@ private[akka] object InputStreamSinkStage { /** * INTERNAL API */ -private[akka] class InputStreamSinkStage(timeout: FiniteDuration) extends SinkStage[ByteString, InputStream]("InputStreamSink") { +private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends SinkStage[ByteString, InputStream]("InputStreamSink") { val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max require(maxBuffer > 0, "Buffer size must be greater than 0") @@ -77,7 +77,7 @@ private[akka] class InputStreamSinkStage(timeout: FiniteDuration) extends SinkSt } }) } - (logic, new InputStreamAdapter(dataQueue, logic.wakeUp, timeout)) + (logic, new InputStreamAdapter(dataQueue, logic.wakeUp, readTimeout)) } } @@ -87,7 +87,7 @@ private[akka] class InputStreamSinkStage(timeout: FiniteDuration) extends SinkSt */ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapterMessage], sendToStage: (AdapterToStageMessage) ⇒ Unit, - timeout: FiniteDuration) + readTimeout: FiniteDuration) extends InputStream { var isActive = true @@ -118,7 +118,7 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt detachedChunk match { case None ⇒ try { - sharedBuffer.poll(timeout.toMillis, TimeUnit.MILLISECONDS) match { + sharedBuffer.poll(readTimeout.toMillis, TimeUnit.MILLISECONDS) match { case Data(data) ⇒ detachedChunk = Some(data) readBytes(a, begin, length) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index 777cacf7a6..1fa1cf555c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -32,7 +32,7 @@ private[akka] object OutputStreamSourceStage { } } -private[akka] class OutputStreamSourceStage(timeout: FiniteDuration) extends SourceStage[ByteString, OutputStream]("OutputStreamSource") { +private[akka] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends SourceStage[ByteString, OutputStream]("OutputStreamSource") { val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max require(maxBuffer > 0, "Buffer size must be greater than 0") @@ -110,14 +110,14 @@ private[akka] class OutputStreamSourceStage(timeout: FiniteDuration) extends Sou } }) } - (logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, timeout)) + (logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, writeTimeout)) } } private[akka] class OutputStreamAdapter(dataQueue: BlockingQueue[ByteString], downstreamStatus: AtomicReference[DownstreamStatus], sendToStage: (AdapterToStageMessage) ⇒ Future[Unit], - timeout: FiniteDuration) + writeTimeout: FiniteDuration) extends OutputStream { var isActive = true @@ -148,7 +148,7 @@ private[akka] class OutputStreamAdapter(dataQueue: BlockingQueue[ByteString], private[this] def sendMessage(message: AdapterToStageMessage, handleCancelled: Boolean = true) = send(() ⇒ try { - Await.ready(sendToStage(message), timeout) + Await.ready(sendToStage(message), writeTimeout) if (downstreamStatus.get() == Canceled && handleCancelled) { //Publisher considered to be terminated at earliest convenience to minimize messages sending back and forth isPublisherAlive = false diff --git a/akka-stream/src/main/scala/akka/stream/io/Implicits.scala b/akka-stream/src/main/scala/akka/stream/io/Implicits.scala deleted file mode 100644 index 9e92f5fc62..0000000000 --- a/akka-stream/src/main/scala/akka/stream/io/Implicits.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.io - -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source - -import scala.language.implicitConversions - -/** - * Provides implicit conversions such that sources and sinks contained within `akka.stream.io` - * as if they were defined on [[Source]] or [[Sink]] directly. - * - * Example: - * {{{ - * import akka.stream.scaladsl.Source - * import akka.stream.io._ - * - * // explicitly using IO Source: - * FileSource(file).map(...) - * - * // using implicit conversion: - * import akka.stream.io.Implicits._ - * Source.synchronousFile(file).map(...) - * }}} - */ -object Implicits { - - // ---- Sources ---- - - implicit final class AddInputStreamSource(val s: Source.type) extends AnyVal { - def inputStream: InputStreamSource.type = InputStreamSource - } - - // ---- Sinks ---- - - implicit final class AddOutputStreamSink(val s: Sink.type) extends AnyVal { - def outputStream: OutputStreamSink.type = OutputStreamSink - } -} diff --git a/akka-stream/src/main/scala/akka/stream/io/InputStreamSink.scala b/akka-stream/src/main/scala/akka/stream/io/InputStreamSink.scala deleted file mode 100644 index d6dc0cb396..0000000000 --- a/akka-stream/src/main/scala/akka/stream/io/InputStreamSink.scala +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.io - -import java.io.InputStream -import java.lang.{ Long ⇒ JLong } - -import akka.stream.impl.io.InputStreamSinkStage -import akka.stream.scaladsl.Sink -import akka.stream.{ Attributes, ActorAttributes, javadsl } -import akka.util.ByteString - -import scala.concurrent.duration.{ FiniteDuration, _ } -import scala.language.postfixOps - -/** - * Sink which allows to use [[java.io.InputStream]] to interact with reactive stream. - */ -object InputStreamSink { - - /** - * Creates a synchronous (Java 6 compatible) Sink - * - * It materializes an [[java.io.InputStream]] to interacting with reactive stream. - * - * This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, - * unless configured otherwise by using [[ActorAttributes]]. - */ - def apply(timeout: FiniteDuration = 5 seconds): Sink[ByteString, InputStream] = - Sink.fromGraph(new InputStreamSinkStage(timeout)) - .withAttributes(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") and - Attributes.name("InputStreamSink")) - - /** - * Creates a synchronous (Java 6 compatible) Sink - * - * It materializes an [[java.io.InputStream]] to interacting with reactive stream. - * - * This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, - * unless configured otherwise by using [[akka.stream.ActorAttributes]]. - */ - def create(): javadsl.Sink[ByteString, InputStream] = - new javadsl.Sink(apply()) - - /** - * Creates a synchronous (Java 6 compatible) Sink - * - * It materializes an [[java.io.InputStream]] to interacting with reactive stream. - * - * This sink is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, - * unless configured otherwise by using [[akka.stream.ActorAttributes]]. - */ - def create(timeout: FiniteDuration): javadsl.Sink[ByteString, InputStream] = - new javadsl.Sink(apply(timeout)) - -} diff --git a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala deleted file mode 100644 index 34eae30aeb..0000000000 --- a/akka-stream/src/main/scala/akka/stream/io/InputStreamSource.scala +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.io - -import java.io.InputStream - -import akka.japi.function.Creator -import akka.stream.impl.io.InputStreamSource -import akka.stream.scaladsl.Source -import akka.stream.scaladsl.Source._ -import akka.stream.{ Attributes, javadsl } -import akka.util.ByteString - -import scala.concurrent.Future - -object InputStreamSource { - - final val DefaultChunkSize = 8192 - final val DefaultAttributes = Attributes.name("inputStreamSource") - - /** - * Creates a Source that will pull data out of the given input stream. - * Emitted elements are `chunkSize` sized [[ByteString]] elements. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def apply(createInputStream: () ⇒ InputStream, chunkSize: Int = DefaultChunkSize): Source[ByteString, Future[Long]] = - new Source(new InputStreamSource(createInputStream, chunkSize, DefaultAttributes, shape("InputStreamSource"))) - - /** - * Java API - * - * Creates a Source that will pull data out of the given input stream. - * Emitted elements are [[ByteString]] elements, chunked by default by [[DefaultChunkSize]] bytes. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def create(createInputStream: Creator[InputStream]): javadsl.Source[ByteString, Future[java.lang.Long]] = - create(createInputStream, DefaultChunkSize) - - /** - * Java API - * - * Creates a Source that will pull data out of the given input stream. - * Emitted elements are `chunkSize` sized [[ByteString]] elements. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def create(createInputStream: Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = - apply(() ⇒ createInputStream.create(), chunkSize).asJava.asInstanceOf[javadsl.Source[ByteString, Future[java.lang.Long]]] - -} diff --git a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala deleted file mode 100644 index 833862bd5a..0000000000 --- a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSink.scala +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright (C) 2014-2015 Typesafe Inc. - */ -package akka.stream.io - -import java.io.OutputStream - -import akka.japi.function.Creator -import akka.stream.impl.io.OutputStreamSink -import akka.stream.scaladsl.Sink -import akka.stream.{ ActorAttributes, Attributes, javadsl } -import akka.util.ByteString - -import scala.concurrent.Future - -/** - * Sink which writes incoming [[ByteString]]s to the given [[OutputStream]]. - */ -object OutputStreamSink { - - final val DefaultAttributes = Attributes.name("outputStreamSink") - - /** - * Sink which writes incoming [[ByteString]]s to the given [[OutputStream]]. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, - * unless configured otherwise by using [[ActorAttributes]]. - */ - def apply(output: () ⇒ OutputStream): Sink[ByteString, Future[Long]] = - new Sink(new OutputStreamSink(output, DefaultAttributes, Sink.shape("OutputStreamSink"))) - - /** - * Java API - * - * Sink which writes incoming [[ByteString]]s to the given [[OutputStream]]. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - */ - def create(f: Creator[OutputStream]): javadsl.Sink[ByteString, Future[java.lang.Long]] = - apply(() ⇒ f.create()).asJava.asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] - -} diff --git a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/io/OutputStreamSource.scala deleted file mode 100644 index b3f7959958..0000000000 --- a/akka-stream/src/main/scala/akka/stream/io/OutputStreamSource.scala +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.io - -import java.io.OutputStream - -import akka.stream.Attributes.Name -import akka.stream._ -import akka.stream.impl.io.OutputStreamSourceStage -import akka.stream.scaladsl.{ FlowGraph, Source } -import akka.util.ByteString - -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.language.implicitConversions - -/** - * Source which allows to use [[java.io.OutputStream]] to interact with reactive stream. - */ -object OutputStreamSource { - import scala.language.postfixOps - - /** - * Creates a synchronous (Java 6 compatible) Source. - * - * It materializes an [[java.io.OutputStream]] to interact with reactive stream. - * - * This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, - * unless configured otherwise by using [[akka.stream.ActorAttributes]]. - */ - def apply(timeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] = - Source.fromGraph(new OutputStreamSourceStage(timeout)) - .withAttributes(ActorAttributes.dispatcher("akka.stream.default-blocking-io-dispatcher") and - Attributes.name("OutputStreamSource")) - - /** - * Creates a synchronous (Java 6 compatible) Source. - * - * It materializes an [[java.io.OutputStream]] to interact with reactive stream. - * - * This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, - * unless configured otherwise by using [[akka.stream.ActorAttributes]]. - */ - def create(): javadsl.Source[ByteString, OutputStream] = - new javadsl.Source(apply()) - - /** - * Creates a synchronous (Java 6 compatible) Source. - * - * It materializes an [[java.io.OutputStream]] to interacting with reactive stream. - * - * This source is backed by an Actor which will use the dedicated `akka.stream.default-blocking-io-dispatcher`, - * unless configured otherwise by using [[akka.stream.ActorAttributes]]. - */ - def create(timeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = { - new javadsl.Source(apply(timeout)) - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index f9f9644a0c..fdac8dd116 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -3,7 +3,7 @@ */ package akka.stream.javadsl -import java.io.File +import java.io.{ InputStream, OutputStream, File } import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts @@ -182,6 +182,8 @@ object Sink { * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. + * + * @param f The file to write to */ def file(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = file(f, append = false) @@ -193,10 +195,53 @@ object Sink { * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or * set it for a given Source by using [[ActorAttributes]]. + * + * @param f The file to write to + * @param append Whether or not the file should be overwritten or appended to */ def file(f: File, append: Boolean): javadsl.Sink[ByteString, Future[java.lang.Long]] = new Sink(scaladsl.Sink.file(f, append)).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] + /** + * Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param f A Creator which creates an OutputStream to write to + */ + def outputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, Future[java.lang.Long]] = + new Sink(scaladsl.Sink.outputStream(() ⇒ f.create())).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] + + /** + * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible + * to read the values produced by the stream this Sink is attached to. + * + * This method uses a default read timeout, use [[#inputStream(FiniteDuration)]] to explicitly + * configure the timeout. + * + * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + */ + def inputStream(): Sink[ByteString, InputStream] = new Sink(scaladsl.Sink.inputStream()) + + /** + * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible + * to read the values produced by the stream this Sink is attached to. + * + * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param readTimeout the max time the read operation on the materialized InputStream should block + */ + def inputStream(readTimeout: FiniteDuration): Sink[ByteString, InputStream] = + new Sink(scaladsl.Sink.inputStream(readTimeout)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 36786101a3..b0f98bc9fd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3,7 +3,7 @@ */ package akka.stream.javadsl -import java.io.File +import java.io.{ OutputStream, InputStream, File } import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter @@ -260,8 +260,60 @@ object Source { * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. */ - def file(f: File, chunkSize: Int): Source[ByteString, Future[java.lang.Long]] = + def file(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = new Source(scaladsl.Source.file(f, chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] + + /** + * Creates a Source from an [[java.io.InputStream]] created by the given function. + * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, + * except the final element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def inputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = + new Source(scaladsl.Source.inputStream(() ⇒ in.create(), chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] + + /** + * Creates a Source from an [[java.io.InputStream]] created by the given function. + * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, + * except the last element, which will be up to 8192 in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def inputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, Future[java.lang.Long]] = inputStream(in, 8192) + + /** + * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible + * to write the ByteStrings to the stream this Source is attached to. + * + * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param writeTimeout the max time the write operation on the materialized OutputStream should block + */ + def outputStream(writeTimeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = + new Source(scaladsl.Source.outputStream(writeTimeout)) + + /** + * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible + * to write the ByteStrings to the stream this Source is attached to. The write timeout for OutputStreams + * materialized will default to 5 seconds, @see [[#outputStream(FiniteDuration)]] if you want to override it. + * + * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + */ + def outputStream(): javadsl.Source[ByteString, OutputStream] = + new Source(scaladsl.Source.outputStream()) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index c72a775ddd..dd21691969 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -3,7 +3,7 @@ */ package akka.stream.scaladsl -import java.io.File +import java.io.{ InputStream, OutputStream, File } import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts @@ -11,7 +11,7 @@ import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ -import akka.stream.impl.io.FileSink +import akka.stream.impl.io.{ InputStreamSinkStage, OutputStreamSink, FileSink } import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective } import akka.stream.{ javadsl, _ } import akka.util.ByteString @@ -255,4 +255,29 @@ object Sink { */ def file(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = new Sink(new FileSink(f, append, DefaultAttributes.fileSink, shape("FileSink"))) + + /** + * Creates a Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + */ + def outputStream(out: () ⇒ OutputStream): Sink[ByteString, Future[Long]] = + new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, shape("OutputStreamSink"))) + + /** + * Creates a Sink which when materialized will return an [[InputStream]] which it is possible + * to read the values produced by the stream this Sink is attached to. + * + * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param readTimeout the max time the read operation on the materialized InputStream should block + */ + def inputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] = + Sink.fromGraph(new InputStreamSinkStage(readTimeout)).withAttributes(DefaultAttributes.inputStreamSink) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 1856dcb567..046390be76 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,14 +3,14 @@ */ package akka.stream.scaladsl -import java.io.File +import java.io.{ OutputStream, InputStream, File } import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.{ DefaultAttributes, StageModule } import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.GraphStages.TickSource -import akka.stream.impl.io.FileSource +import akka.stream.impl.io.{ OutputStreamSourceStage, InputStreamSource, FileSource } import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ } import akka.stream.{ Outlet, SourceShape, _ } import akka.util.ByteString @@ -382,8 +382,41 @@ object Source { * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * + * @param f the File to read from + * @param chunkSize the size of each read operation, defaults to 8192 */ def file(f: File, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, shape("FileSource"))) + /** + * Creates a Source from an [[InputStream]] created by the given function. + * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, + * except the final element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * + * @param in a function which creates the InputStream to read from + * @param chunkSize the size of each read operation, defaults to 8192 + */ + def inputStream(in: () ⇒ InputStream, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = + new Source(new InputStreamSource(in, chunkSize, DefaultAttributes.inputStreamSource, shape("InputStreamSource"))) + + /** + * Creates a Source which when materialized will return an [[OutputStream]] which it is possible + * to write the ByteStrings to the stream this Source is attached to. + * + * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds + */ + def outputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] = + Source.fromGraph(new OutputStreamSourceStage(writeTimeout)).withAttributes(DefaultAttributes.outputStreamSource) + }