diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala index f437ef41ad..eb4dc05bac 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala @@ -36,7 +36,7 @@ class FileSourcesBenchmark { val ft = Source.fromIterator(() ⇒ Iterator.continually(line)) .take(10 * 39062) // adjust as needed - .runWith(Sink.file(f)) + .runWith(FileIO.toFile(f)) Await.result(ft, 30.seconds) f @@ -51,8 +51,8 @@ class FileSourcesBenchmark { @Setup def setup() { - fileChannelSource = Source.file(file, bufSize) - fileInputStreamSource = Source.inputStream(() ⇒ new FileInputStream(file), bufSize) + fileChannelSource = FileIO.fromFile(file, bufSize) + fileInputStreamSource = StreamConverters.fromInputStream(() ⇒ new FileInputStream(file), bufSize) ioSourceLinesIterator = Source.fromIterator(() ⇒ scala.io.Source.fromFile(file).getLines()).map(ByteString(_)) } diff --git a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java index b4d7ab5133..7e86a92eea 100644 --- a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java +++ b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java @@ -195,32 +195,32 @@ public class MigrationsJava { //#file-source-sink final Source> fileSrc = - Source.file(new File(".")); + FileIO.fromFile(new File(".")); final Source> otherFileSrc = - Source.file(new File("."), 1024); + FileIO.fromFile(new File("."), 1024); final Sink> fileSink = - Sink.file(new File(".")); + FileIO.toFile(new File(".")); //#file-source-sink //#input-output-stream-source-sink final Source> inputStreamSrc = - Source.inputStream(new Creator(){ + StreamConverters.fromInputStream(new Creator(){ public InputStream create() { return new SomeInputStream(); } }); final Source> otherInputStreamSrc = - Source.inputStream(new Creator(){ + StreamConverters.fromInputStream(new Creator(){ public InputStream create() { return new SomeInputStream(); } }, 1024); final Sink> outputStreamSink = - Sink.outputStream(new Creator(){ + StreamConverters.fromOutputStream(new Creator(){ public OutputStream create() { return new SomeOutputStream(); } @@ -232,16 +232,16 @@ public class MigrationsJava { final FiniteDuration timeout = FiniteDuration.Zero(); final Source outputStreamSrc = - Source.outputStream(); + StreamConverters.asOutputStream(); final Source otherOutputStreamSrc = - Source.outputStream(timeout); + StreamConverters.asOutputStream(timeout); final Sink someInputStreamSink = - Sink.inputStream(); + StreamConverters.asInputStream(); final Sink someOtherInputStreamSink = - Sink.inputStream(timeout); + StreamConverters.asInputStream(timeout); //#output-input-stream-source-sink } diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index 8799dfdfac..ad8278d243 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -562,17 +562,17 @@ should be replaced by: SynchronousFileSource and SynchronousFileSink ============================================= -Both have been replaced by ``Source.file(…)`` and ``Sink.file(…)`` due to discoverability issues +Both have been replaced by ``FileIO.toFile(…)`` and ``FileIO.fromFile(…)`` due to discoverability issues paired with names which leaked internal implementation details. Update procedure ---------------- -Replace ``SynchronousFileSource.create(`` with ``Source.file(`` +Replace ``SynchronousFileSource.create(`` with ``FileIO.fromFile(`` -Replace ``SynchronousFileSink.create(`` with ``Sink.file(`` +Replace ``SynchronousFileSink.create(`` with ``FileIO.toFile(`` -Replace ``SynchronousFileSink.appendTo(f)`` with ``Sink.file(f, true)`` +Replace ``SynchronousFileSink.appendTo(f)`` with ``FileIO.toFile(f, true)`` Example ^^^^^^^ @@ -598,14 +598,14 @@ should be replaced by InputStreamSource and OutputStreamSink ====================================== -Both have been replaced by ``Source.inputStream(…)`` and ``Sink.outputStream(…)`` due to discoverability issues. +Both have been replaced by ``StreamConverters.fromInputStream(…)`` and ``StreamConverters.fromOutputStream(…)`` due to discoverability issues. Update procedure ---------------- -Replace ``InputStreamSource.create(`` with ``Source.inputStream(`` +Replace ``InputStreamSource.create(`` with ``StreamConverters.fromInputStream(`` -Replace ``OutputStreamSink.create(`` with ``Sink.outputStream(`` +Replace ``OutputStreamSink.create(`` with ``StreamConverters.fromOutputStream(`` Example ^^^^^^^ @@ -644,14 +644,14 @@ should be replaced by OutputStreamSource and InputStreamSink ====================================== -Both have been replaced by ``Source.outputStream(…)`` and ``Sink.inputStream(…)`` due to discoverability issues. +Both have been replaced by ``StreamConverters.asOutputStream(…)`` and ``StreamConverters.asInputStream(…)`` due to discoverability issues. Update procedure ---------------- -Replace ``OutputStreamSource.create(`` with ``Source.outputStream(`` +Replace ``OutputStreamSource.create(`` with ``StreamConverters.asOutputStream(`` -Replace ``InputStreamSink.create(`` with ``Sink.inputStream(`` +Replace ``InputStreamSink.create(`` with ``StreamConverters.asInputStream(`` Example ^^^^^^^ diff --git a/akka-docs-dev/rst/java/stream-io.rst b/akka-docs-dev/rst/java/stream-io.rst index b3f067845b..743c2b5be8 100644 --- a/akka-docs-dev/rst/java/stream-io.rst +++ b/akka-docs-dev/rst/java/stream-io.rst @@ -110,7 +110,7 @@ on files. Once Akka is free to require JDK8 (from ``2.4.x``) these implementations will be updated to make use of the new NIO APIs (i.e. :class:`AsynchronousFileChannel`). -Streaming data from a file is as easy as creating a `Source.file` given a target file, and an optional +Streaming data from a file is as easy as creating a `FileIO.fromFile` given a target file, and an optional ``chunkSize`` which determines the buffer size determined as one "element" in such stream: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamFileDocTest.java#file-source diff --git a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala index 0dd05867d8..37357c098b 100644 --- a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala +++ b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala @@ -245,34 +245,34 @@ class MigrationsScala extends AkkaSpec { //#query-param //#file-source-sink - val fileSrc = Source.file(new File(".")) + val fileSrc = FileIO.fromFile(new File(".")) - val otherFileSrc = Source.file(new File("."), 1024) + val otherFileSrc = FileIO.fromFile(new File("."), 1024) - val someFileSink = Sink.file(new File(".")) + val someFileSink = FileIO.toFile(new File(".")) //#file-source-sink class SomeInputStream extends java.io.InputStream { override def read(): Int = 0 } class SomeOutputStream extends java.io.OutputStream { override def write(b: Int): Unit = () } //#input-output-stream-source-sink - val inputStreamSrc = Source.inputStream(() => new SomeInputStream()) + val inputStreamSrc = StreamConverters.fromInputStream(() => new SomeInputStream()) - val otherInputStreamSrc = Source.inputStream(() => new SomeInputStream()) + val otherInputStreamSrc = StreamConverters.fromInputStream(() => new SomeInputStream()) - val someOutputStreamSink = Sink.outputStream(() => new SomeOutputStream()) + val someOutputStreamSink = StreamConverters.fromOutputStream(() => new SomeOutputStream()) //#input-output-stream-source-sink //#output-input-stream-source-sink val timeout: FiniteDuration = 0.seconds - val outputStreamSrc = Source.outputStream() + val outputStreamSrc = StreamConverters.asOutputStream() - val otherOutputStreamSrc = Source.outputStream(timeout) + val otherOutputStreamSrc = StreamConverters.asOutputStream(timeout) - val someInputStreamSink = Sink.inputStream() + val someInputStreamSink = StreamConverters.asInputStream() - val someOtherInputStreamSink = Sink.inputStream(timeout) + val someOtherInputStreamSink = StreamConverters.asInputStream(timeout) //#output-input-stream-source-sink } } diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/FileUploadExamplesSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/FileUploadExamplesSpec.scala index e1860568f5..4963a2a0c2 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/FileUploadExamplesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/FileUploadExamplesSpec.scala @@ -34,7 +34,7 @@ class FileUploadExamplesSpec extends RoutingSpec { // stream into a file as the chunks of it arrives and return a future // file to where it got stored val file = File.createTempFile("upload", "tmp") - b.entity.dataBytes.runWith(Sink.file(file)).map(_ => + b.entity.dataBytes.runWith(FileIO.toFile(file)).map(_ => (b.name -> file)) case b: BodyPart => diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala index fcf6bccba9..8efe448b13 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala @@ -14,7 +14,7 @@ import akka.http.scaladsl.model.headers.{ Server, RawHeader } import akka.http.scaladsl.server.RouteResult.{ Complete, Rejected } import akka.http.scaladsl.server._ import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.scaladsl.{FileIO, Sink, Source} import akka.util.ByteString import scala.concurrent.Future @@ -174,7 +174,7 @@ class BasicDirectivesExamplesSpec extends RoutingSpec { path("sample") { complete { // internally uses the configured fileIODispatcher: - val source = Source.file(new File("example.json")) + val source = FileIO.fromFile(new File("example.json")) HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, source)) } } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala index 1a66b582b6..5cb634444f 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala @@ -6,8 +6,7 @@ package docs.stream.io import java.io.File import akka.stream._ -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ FileIO, Sink, Source } import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.util.ByteString @@ -46,7 +45,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { //#file-source - val foreach: Future[Long] = Source.file(file) + val foreach: Future[Long] = FileIO.fromFile(file) .to(Sink.ignore) .run() //#file-source @@ -54,7 +53,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { "configure dispatcher in code" in { //#custom-dispatcher-code - Sink.file(file) + FileIO.fromFile(file) .withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher")) //#custom-dispatcher-code } diff --git a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst index 23664831ab..e416b9fa2d 100644 --- a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst @@ -609,15 +609,17 @@ should be replaced by: SynchronousFileSource and SynchronousFileSink ============================================= -Both have been replaced by ``Source.file(…)`` and ``Sink.file(…)`` due to discoverability issues + +``SynchronousFileSource`` and ``SynchronousFileSink`` +have been replaced by ``FileIO.read(…)`` and ``FileIO.write(…)`` due to discoverability issues paired with names which leaked internal implementation details. Update procedure ---------------- -Replace ``SynchronousFileSource(`` and ``SynchronousFileSource.apply(`` with ``Source.file(`` +Replace ``SynchronousFileSource(`` and ``SynchronousFileSource.apply(`` with ``FileIO.fromFile(`` -Replace ``SynchronousFileSink(`` and ``SynchronousFileSink.apply(`` with ``Sink.file(`` +Replace ``SynchronousFileSink(`` and ``SynchronousFileSink.apply(`` with ``FileIO.toFile(`` Example ^^^^^^^ @@ -633,6 +635,7 @@ Example // This no longer works! val someFileSink = SynchronousFileSink(new File(".")) + should be replaced by .. includecode:: code/docs/MigrationsScala.scala#file-source-sink @@ -640,14 +643,14 @@ should be replaced by InputStreamSource and OutputStreamSink ====================================== -Both have been replaced by ``Source.inputStream(…)`` and ``Sink.outputStream(…)`` due to discoverability issues. +Both have been replaced by ``StreamConverters.fromInputStream(…)`` and ``StreamConverters.fromOutputStream(…)`` due to discoverability issues. Update procedure ---------------- -Replace ``InputStreamSource(`` and ``InputStreamSource.apply(`` with ``Source.inputStream(`` - -Replace ``OutputStreamSink(`` and ``OutputStreamSink.apply(`` with ``Sink.outputStream(`` +Replace ``InputStreamSource(`` and ``InputStreamSource.apply(`` with ``StreamConverters.fromInputStream(`` +i +Replace ``OutputStreamSink(`` and ``OutputStreamSink.apply(`` with ``StreamConverters.fromOutputStream(`` Example ^^^^^^^ @@ -670,14 +673,14 @@ should be replaced by OutputStreamSource and InputStreamSink ====================================== -Both have been replaced by ``Source.outputStream(…)`` and ``Sink.inputStream(…)`` due to discoverability issues. +Both have been replaced by ``StreamConverters.asOutputStream(…)`` and ``StreamConverters.asInputStream(…)`` due to discoverability issues. Update procedure ---------------- -Replace ``OutputStreamSource(`` and ``OutputStreamSource.apply(`` with ``Source.outputStream(`` +Replace ``OutputStreamSource(`` and ``OutputStreamSource.apply(`` with ``StreamConverters.asOutputStream(`` -Replace ``InputStreamSink(`` and ``InputStreamSink.apply(`` with ``Sink.inputStream(`` +Replace ``InputStreamSink(`` and ``InputStreamSink.apply(`` with ``StreamConverters.asInputStream(`` Example ^^^^^^^ @@ -698,4 +701,4 @@ Example should be replaced by -.. includecode:: code/docs/MigrationsScala.scala#output-input-stream-source-sink +.. includecode:: code/docs/MigrationsScala.scala#output-input-stream-source-sink \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream-io.rst b/akka-docs-dev/rst/scala/stream-io.rst index c7ec89e7ef..04d695be3e 100644 --- a/akka-docs-dev/rst/scala/stream-io.rst +++ b/akka-docs-dev/rst/scala/stream-io.rst @@ -110,7 +110,7 @@ on files. Once Akka is free to require JDK8 (from ``2.4.x``) these implementations will be updated to make use of the new NIO APIs (i.e. :class:`AsynchronousFileChannel`). -Streaming data from a file is as easy as creating a `Source.file` given a target file, and an optional +Streaming data from a file is as easy as creating a `FileIO.fromFile` given a target file, and an optional ``chunkSize`` which determines the buffer size determined as one "element" in such stream: .. includecode:: code/docs/stream/io/StreamFileDocSpec.scala#file-source diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 22f5bd5a41..6f306d37b6 100755 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -197,7 +197,7 @@ object HttpEntity { val fileLength = file.length if (fileLength > 0) Default(contentType, fileLength, - if (chunkSize > 0) Source.file(file, chunkSize) else Source.file(file)) + if (chunkSize > 0) FileIO.fromFile(file, chunkSize) else FileIO.fromFile(file)) else empty(contentType) } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala index a16978e02a..c532cae7f3 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala @@ -9,7 +9,7 @@ import java.io.File import java.net.{ URI, URL } import akka.stream.ActorAttributes -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ FileIO, StreamConverters } import scala.annotation.tailrec import akka.actor.ActorSystem @@ -53,7 +53,7 @@ trait FileAndResourceDirectives { withRangeSupportAndPrecompressedMediaTypeSupportAndExtractSettings { settings ⇒ complete { HttpEntity.Default(contentType, file.length, - Source.file(file).withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) + FileIO.fromFile(file).withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) } } } else complete(HttpEntity.Empty) @@ -90,7 +90,7 @@ trait FileAndResourceDirectives { withRangeSupportAndPrecompressedMediaTypeSupportAndExtractSettings { settings ⇒ complete { HttpEntity.Default(contentType, length, - Source.inputStream(() ⇒ url.openStream()) + StreamConverters.fromInputStream(() ⇒ url.openStream()) .withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) } } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala index 8db4667f76..6a5e33d792 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala @@ -34,7 +34,7 @@ trait FileUploadDirectives { case (fileInfo, bytes) ⇒ val destination = File.createTempFile("akka-http-upload", ".tmp") - val uploadedF: Future[(FileInfo, File)] = bytes.runWith(Sink.file(destination)) + val uploadedF: Future[(FileInfo, File)] = bytes.runWith(FileIO.toFile(destination)) .map(_ ⇒ (fileInfo, destination)) onComplete[(FileInfo, File)](uploadedF).flatMap { diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala index ed1da8da97..f36813b48c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala @@ -7,6 +7,7 @@ import java.io.{ File, FileWriter } import akka.actor.ActorSystem import akka.event.Logging +import akka.stream.scaladsl.FileIO import akka.stream.scaladsl.{ Source, Sink } import akka.stream.testkit._ import akka.stream.testkit.Utils._ @@ -36,7 +37,7 @@ class FilePublisherTest extends AkkaPublisherVerification[ByteString] { } def createPublisher(elements: Long): Publisher[ByteString] = - Source.file(file, chunkSize = 512) + FileIO.fromFile(file, chunkSize = 512) .take(elements) .runWith(Sink.asPublisher(false)) diff --git a/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java b/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java index 493f5f6128..bc8f3ea867 100644 --- a/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/io/InputStreamSinkTest.java @@ -8,6 +8,7 @@ import akka.stream.StreamTest; import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; +import akka.stream.javadsl.StreamConverters; import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.Utils; import akka.util.ByteString; @@ -36,7 +37,7 @@ public class InputStreamSinkTest extends StreamTest { public void mustReadEventViaInputStream() throws Exception { final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); - final Sink sink = Sink.inputStream(timeout); + final Sink sink = StreamConverters.asInputStream(timeout); final List list = Collections.singletonList(ByteString.fromString("a")); final InputStream stream = Source.from(list).runWith(sink, materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java index 365240966c..927d3f492d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java @@ -7,10 +7,7 @@ import akka.actor.ActorRef; import akka.japi.Pair; import akka.japi.function.Procedure; import akka.stream.StreamTest; -import akka.stream.javadsl.AkkaJUnitActorSystemResource; -import akka.stream.javadsl.Keep; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; +import akka.stream.javadsl.*; import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.Utils; import akka.testkit.JavaTestKit; @@ -40,7 +37,7 @@ public class OutputStreamSourceTest extends StreamTest { final FiniteDuration timeout = FiniteDuration.create(300, TimeUnit.MILLISECONDS); final JavaTestKit probe = new JavaTestKit(system); - final Source source = Source.outputStream(timeout); + final Source source = StreamConverters.asOutputStream(timeout); final OutputStream s = source.to(Sink.foreach(new Procedure() { public void apply(ByteString elem) { probe.getRef().tell(elem, ActorRef.noSender()); diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index 16b3794e83..8924f4b78c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -78,7 +78,9 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { TestCase("Balance", scaladsl.Balance.getClass, javadsl.Balance.getClass), TestCase("Zip", scaladsl.Zip.getClass, javadsl.Zip.getClass), TestCase("UnZip", scaladsl.Unzip.getClass, javadsl.Unzip.getClass), - TestCase("Concat", scaladsl.Concat.getClass, javadsl.Concat.getClass)) + TestCase("Concat", scaladsl.Concat.getClass, javadsl.Concat.getClass), + TestCase("FileIO", scaladsl.FileIO.getClass, javadsl.FileIO.getClass), + TestCase("StreamConverters", scaladsl.StreamConverters.getClass, javadsl.StreamConverters.getClass)) "Java DSL" must provide { testCases foreach { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 03d5ebe839..8d7e2bfcb0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -9,7 +9,7 @@ import akka.actor.ActorSystem import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.scaladsl.{ FileIO, Sink, Source } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.StreamTestKit @@ -45,7 +45,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "write lines to a file" in assertAllStagesStopped { targetFile { f ⇒ val completion = Source(TestByteStrings) - .runWith(Sink.file(f)) + .runWith(FileIO.toFile(f)) val size = Await.result(completion, 3.seconds) size should equal(6006) @@ -58,7 +58,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { def write(lines: List[String]) = Source(lines) .map(ByteString(_)) - .runWith(Sink.file(f)) + .runWith(FileIO.toFile(f)) val completion1 = write(TestLines) Await.result(completion1, 3.seconds) @@ -77,7 +77,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { def write(lines: List[String] = TestLines) = Source(lines) .map(ByteString(_)) - .runWith(Sink.file(f, append = true)) + .runWith(FileIO.toFile(f, append = true)) val completion1 = write() val written1 = Await.result(completion1, 3.seconds) @@ -98,7 +98,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(3.seconds) try { - Source.fromIterator(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(Sink.file(f))(materializer) + Source.fromIterator(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(FileIO.toFile(f))(materializer) materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get @@ -117,7 +117,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { try { Source.fromIterator(() ⇒ Iterator.continually(TestByteStrings.head)) - .to(Sink.file(f)) + .to(FileIO.toFile(f)) .withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")) .run()(materializer) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index 09dc04b5cc..9c45661c30 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -16,7 +16,7 @@ import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children import akka.stream.io.FileSourceSpec.Settings -import akka.stream.scaladsl.{ Source, Sink } +import akka.stream.scaladsl.{ FileIO, Sink } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink @@ -74,7 +74,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val chunkSize = 512 val bufferAttributes = Attributes.inputBuffer(1, 2) - val p = Source.file(testFile, chunkSize) + val p = FileIO.fromFile(testFile, chunkSize) .withAttributes(bufferAttributes) .runWith(Sink.asPublisher(false)) val c = TestSubscriber.manualProbe[ByteString]() @@ -111,7 +111,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val demandAllButOneChunks = TestText.length / chunkSize - 1 - val p = Source.file(testFile, chunkSize) + val p = FileIO.fromFile(testFile, chunkSize) .withAttributes(bufferAttributes) .runWith(Sink.asPublisher(false)) @@ -140,7 +140,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "onError whent trying to read from file which does not exist" in assertAllStagesStopped { - val p = Source.file(notExistingFile).runWith(Sink.asPublisher(false)) + val p = FileIO.fromFile(notExistingFile).runWith(Sink.asPublisher(false)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) @@ -156,7 +156,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { import settings._ s"count lines in real file (chunkSize = $chunkSize, readAhead = $readAhead)" in { - val s = Source.file(manyLines, chunkSize = chunkSize) + val s = FileIO.fromFile(manyLines, chunkSize = chunkSize) .withAttributes(Attributes.inputBuffer(readAhead, readAhead)) val f = s.runWith(Sink.fold(0) { case (acc, l) ⇒ acc + l.utf8String.count(_ == '\n') }) @@ -172,7 +172,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(500.millis) try { - val p = Source.file(manyLines).runWith(TestSink.probe)(materializer) + val p = FileIO.fromFile(manyLines).runWith(TestSink.probe)(materializer) materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get @@ -188,7 +188,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(500.millis) try { - val p = Source.file(manyLines) + val p = FileIO.fromFile(manyLines) .withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")) .runWith(TestSink.probe)(materializer) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index 1da67456d7..fa948e05e9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -11,7 +11,7 @@ import akka.stream._ import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.InputStreamSinkStage import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } -import akka.stream.scaladsl.{ Keep, Sink } +import akka.stream.scaladsl.{ Keep, Sink, StreamConverters } import akka.stream.stage.InHandler import akka.stream.testkit.AkkaSpec import akka.stream.testkit.Utils._ @@ -80,7 +80,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "InputStreamSink" must { "read bytes from InputStream" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() probe.sendNext(byteString) val arr = newArray() @@ -113,7 +113,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() val data = randomArray(2) probe.sendNext(ByteString(data)) @@ -126,7 +126,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "block read until get requested number of bytes from upstream" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() val arr = newArray() val f = Future(inputStream.read(arr)) @@ -141,7 +141,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "fill up buffer by default" in assertAllStagesStopped { import system.dispatcher - val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() val array2 = randomArray(3) probe.sendNext(byteString) @@ -162,7 +162,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "throw error when reactive stream is closed" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() probe.sendNext(byteString) inputStream.close() @@ -188,7 +188,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "return -1 when read after stream is completed" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(Sink.inputStream())(Keep.both).run() + val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() probe.sendNext(byteString) val arr = newArray() @@ -229,7 +229,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val materializer = ActorMaterializer()(sys) try { - TestSource.probe[ByteString].runWith(Sink.inputStream())(materializer) + TestSource.probe[ByteString].runWith(StreamConverters.asInputStream())(materializer) materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "inputStreamSink").get assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala index 3f23e655d0..3adcae3150 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala @@ -5,7 +5,8 @@ package akka.stream.io import java.io.InputStream -import akka.stream.scaladsl.{ Source, Sink } +import akka.stream.scaladsl.StreamConverters +import akka.stream.scaladsl.Sink import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } @@ -19,7 +20,7 @@ class InputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) with ScalaF "InputStreamSource" must { "read bytes from InputStream" in assertAllStagesStopped { - val f = Source.inputStream(() ⇒ new InputStream { + val f = StreamConverters.fromInputStream(() ⇒ new InputStream { @volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt) override def read(): Int = { buf match { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala index 70be176b5c..20c1a582e9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala @@ -5,7 +5,7 @@ package akka.stream.io import java.io.OutputStream -import akka.stream.scaladsl.{ Source, Sink } +import akka.stream.scaladsl.{ Source, Sink, StreamConverters } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } @@ -26,7 +26,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val datas = List(ByteString("a"), ByteString("c"), ByteString("c")) val completion = Source(datas) - .runWith(Sink.outputStream(() ⇒ new OutputStream { + .runWith(StreamConverters.fromOutputStream(() ⇒ new OutputStream { override def write(i: Int): Unit = () override def write(bytes: Array[Byte]): Unit = p.ref ! ByteString(bytes).utf8String })) @@ -40,7 +40,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "close underlying stream when error received" in assertAllStagesStopped { val p = TestProbe() Source.failed(new TE("Boom!")) - .runWith(Sink.outputStream(() ⇒ new OutputStream { + .runWith(StreamConverters.fromOutputStream(() ⇒ new OutputStream { override def write(i: Int): Unit = () override def close() = p.ref ! "closed" })) @@ -51,7 +51,7 @@ class OutputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "close underlying stream when completion received" in assertAllStagesStopped { val p = TestProbe() Source.empty - .runWith(Sink.outputStream(() ⇒ new OutputStream { + .runWith(StreamConverters.fromOutputStream(() ⇒ new OutputStream { override def write(i: Int): Unit = () override def write(bytes: Array[Byte]): Unit = p.ref ! ByteString(bytes).utf8String override def close() = p.ref ! "closed" diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala index be470c4430..373175a83b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala @@ -11,7 +11,7 @@ import akka.stream._ import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.OutputStreamSourceStage import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } -import akka.stream.scaladsl.{ Keep, Source } +import akka.stream.scaladsl.{ Keep, Source, StreamConverters } import akka.stream.stage.OutHandler import akka.stream.testkit.Utils._ import akka.stream.testkit._ @@ -70,7 +70,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { "OutputStreamSource" must { "read bytes from OutputStream" in assertAllStagesStopped { - val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run + val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both).run val s = probe.expectSubscription() outputStream.write(bytesArray) @@ -81,7 +81,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "block flush call until send all buffer to downstream" in assertAllStagesStopped { - val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run + val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both).run val s = probe.expectSubscription() outputStream.write(bytesArray) @@ -99,7 +99,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "not block flushes when buffer is empty" in assertAllStagesStopped { - val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run + val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both).run val s = probe.expectSubscription() outputStream.write(bytesArray) @@ -117,7 +117,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "block writes when buffer is full" in assertAllStagesStopped { - val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both) + val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both) .withAttributes(Attributes.inputBuffer(16, 16)).run val s = probe.expectSubscription() @@ -138,7 +138,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "throw error when write after stream is closed" in assertAllStagesStopped { - val (outputStream, probe) = Source.outputStream().toMat(TestSink.probe[ByteString])(Keep.both).run + val (outputStream, probe) = StreamConverters.asOutputStream().toMat(TestSink.probe[ByteString])(Keep.both).run probe.expectSubscription() outputStream.close() @@ -151,7 +151,7 @@ class OutputStreamSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val materializer = ActorMaterializer()(sys) try { - Source.outputStream().runWith(TestSink.probe[ByteString])(materializer) + StreamConverters.asOutputStream().runWith(TestSink.probe[ByteString])(materializer) materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "outputStreamSource").get assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala new file mode 100644 index 0000000000..35349c70f4 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream.javadsl + +import java.io.{ InputStream, OutputStream, File } + +import akka.japi.function +import akka.stream.{ scaladsl, javadsl, ActorAttributes } +import akka.util.ByteString + +import scala.concurrent.Future + +/** + * Factories to create sinks and sources from files + */ +object FileIO { + + /** + * Creates a Sink that writes incoming [[ByteString]] elements to the given file. + * Overwrites existing files, if you want to append to an existing file use [[#file(File, Boolean)]] and + * pass in `true` as the Boolean argument. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param f The file to write to + */ + def toFile(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = toFile(f, append = false) + + /** + * Creates a Sink that writes incoming [[ByteString]] elements to the given file and either overwrites + * or appends to it. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param f The file to write to + * @param append Whether or not the file should be overwritten or appended to + */ + def toFile(f: File, append: Boolean): javadsl.Sink[ByteString, Future[java.lang.Long]] = + new Sink(scaladsl.FileIO.toFile(f, append)).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] + + /** + * Creates a Source from a Files contents. + * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, + * except the last element, which will be up to 8192 in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def fromFile(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = fromFile(f, 8192) + + /** + * Creates a synchronous (Java 6 compatible) Source from a Files contents. + * Emitted elements are `chunkSize` sized [[ByteString]] elements, + * except the last element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def fromFile(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = + new Source(scaladsl.FileIO.fromFile(f, chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 0b3b72a018..2865f06efe 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -212,75 +212,6 @@ object Sink { def queue[T](bufferSize: Int, timeout: FiniteDuration): Sink[T, SinkQueue[T]] = new Sink(scaladsl.Sink.queue(bufferSize, timeout)) - /** - * Creates a Sink that writes incoming [[ByteString]] elements to the given file. - * Overwrites existing files, if you want to append to an existing file use [[#file(File, Boolean)]] and - * pass in `true` as the Boolean argument. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * @param f The file to write to - */ - def file(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = file(f, append = false) - - /** - * Creates a Sink that writes incoming [[ByteString]] elements to the given file and either overwrites - * or appends to it. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * @param f The file to write to - * @param append Whether or not the file should be overwritten or appended to - */ - def file(f: File, append: Boolean): javadsl.Sink[ByteString, Future[java.lang.Long]] = - new Sink(scaladsl.Sink.file(f, append)).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] - - /** - * Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * @param f A Creator which creates an OutputStream to write to - */ - def outputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, Future[java.lang.Long]] = - new Sink(scaladsl.Sink.outputStream(() ⇒ f.create())).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] - - /** - * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible - * to read the values produced by the stream this Sink is attached to. - * - * This method uses a default read timeout, use [[#inputStream(FiniteDuration)]] to explicitly - * configure the timeout. - * - * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - */ - def inputStream(): Sink[ByteString, InputStream] = new Sink(scaladsl.Sink.inputStream()) - - /** - * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible - * to read the values produced by the stream this Sink is attached to. - * - * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * @param readTimeout the max time the read operation on the materialized InputStream should block - */ - def inputStream(readTimeout: FiniteDuration): Sink[ByteString, InputStream] = - new Sink(scaladsl.Sink.inputStream(readTimeout)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c28ffdf5e6..bf1940772c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -288,82 +288,6 @@ object Source { def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration): Source[T, SourceQueue[T]] = new Source(scaladsl.Source.queue(bufferSize, overflowStrategy, timeout)) - /** - * Creates a Source from a Files contents. - * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, - * except the last element, which will be up to 8192 in size. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def file(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = file(f, 8192) - - /** - * Creates a synchronous (Java 6 compatible) Source from a Files contents. - * Emitted elements are `chunkSize` sized [[ByteString]] elements, - * except the last element, which will be up to `chunkSize` in size. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def file(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = - new Source(scaladsl.Source.file(f, chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] - - /** - * Creates a Source from an [[java.io.InputStream]] created by the given function. - * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, - * except the final element, which will be up to `chunkSize` in size. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def inputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = - new Source(scaladsl.Source.inputStream(() ⇒ in.create(), chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] - - /** - * Creates a Source from an [[java.io.InputStream]] created by the given function. - * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, - * except the last element, which will be up to 8192 in size. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def inputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, Future[java.lang.Long]] = inputStream(in, 8192) - - /** - * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible - * to write the ByteStrings to the stream this Source is attached to. - * - * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * @param writeTimeout the max time the write operation on the materialized OutputStream should block - */ - def outputStream(writeTimeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = - new Source(scaladsl.Source.outputStream(writeTimeout)) - - /** - * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible - * to write the ByteStrings to the stream this Source is attached to. The write timeout for OutputStreams - * materialized will default to 5 seconds, @see [[#outputStream(FiniteDuration)]] if you want to override it. - * - * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - */ - def outputStream(): javadsl.Source[ByteString, OutputStream] = - new Source(scaladsl.Source.outputStream()) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala new file mode 100644 index 0000000000..64ae0ade08 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream.javadsl + +import java.io.{ InputStream, OutputStream } + +import akka.japi.function +import akka.stream.{ scaladsl, javadsl, ActorAttributes } +import akka.util.ByteString + +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration + +/** + * Converters for interacting with the blocking `java.io` streams APIs + */ +object StreamConverters { + /** + * Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param f A Creator which creates an OutputStream to write to + */ + def fromOutputStream(f: function.Creator[OutputStream]): javadsl.Sink[ByteString, Future[java.lang.Long]] = + new Sink(scaladsl.StreamConverters.fromOutputStream(() ⇒ f.create())).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] + + /** + * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible + * to read the values produced by the stream this Sink is attached to. + * + * This method uses a default read timeout, use [[#inputStream(FiniteDuration)]] to explicitly + * configure the timeout. + * + * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + */ + def asInputStream(): Sink[ByteString, InputStream] = new Sink(scaladsl.StreamConverters.asInputStream()) + + /** + * Creates a Sink which when materialized will return an [[java.io.InputStream]] which it is possible + * to read the values produced by the stream this Sink is attached to. + * + * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param readTimeout the max time the read operation on the materialized InputStream should block + */ + def asInputStream(readTimeout: FiniteDuration): Sink[ByteString, InputStream] = + new Sink(scaladsl.StreamConverters.asInputStream(readTimeout)) + + /** + * Creates a Source from an [[java.io.InputStream]] created by the given function. + * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, + * except the final element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def fromInputStream(in: function.Creator[InputStream], chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = + new Source(scaladsl.StreamConverters.fromInputStream(() ⇒ in.create(), chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] + + /** + * Creates a Source from an [[java.io.InputStream]] created by the given function. + * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, + * except the last element, which will be up to 8192 in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def fromInputStream(in: function.Creator[InputStream]): javadsl.Source[ByteString, Future[java.lang.Long]] = fromInputStream(in, 8192) + + /** + * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible + * to write the ByteStrings to the stream this Source is attached to. + * + * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param writeTimeout the max time the write operation on the materialized OutputStream should block + */ + def asOutputStream(writeTimeout: FiniteDuration): javadsl.Source[ByteString, OutputStream] = + new Source(scaladsl.StreamConverters.asOutputStream(writeTimeout)) + + /** + * Creates a Source which when materialized will return an [[java.io.OutputStream]] which it is possible + * to write the ByteStrings to the stream this Source is attached to. The write timeout for OutputStreams + * materialized will default to 5 seconds, @see [[#outputStream(FiniteDuration)]] if you want to override it. + * + * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + */ + def asOutputStream(): javadsl.Source[ByteString, OutputStream] = + new Source(scaladsl.StreamConverters.asOutputStream()) + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala new file mode 100644 index 0000000000..82006c58d8 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import java.io.{ OutputStream, InputStream, File } + +import akka.stream.ActorAttributes +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.io._ +import akka.util.ByteString + +import scala.concurrent.Future +import scala.concurrent.duration._ + +/** + * Java API: Factories to create sinks and sources from files + */ +object FileIO { + + import Source.{ shape ⇒ sourceShape } + import Sink.{ shape ⇒ sinkShape } + + /** + * Creates a Source from a Files contents. + * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, + * except the final element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * + * @param f the File to read from + * @param chunkSize the size of each read operation, defaults to 8192 + */ + def fromFile(f: File, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = + new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, sourceShape("FileSource"))) + + /** + * Creates a Sink which writes incoming [[ByteString]] elements to the given file and either overwrites + * or appends to it. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, + * unless configured otherwise by using [[ActorAttributes]]. + */ + def toFile(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = + new Sink(new FileSink(f, append, DefaultAttributes.fileSink, sinkShape("FileSink"))) + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index eee4cfce87..ede165a213 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -281,40 +281,4 @@ object Sink { new Sink(new AcknowledgeSink(bufferSize, DefaultAttributes.acknowledgeSink, shape("AcknowledgeSink"), timeout)) } - /** - * Creates a Sink which writes incoming [[ByteString]] elements to the given file and either overwrites - * or appends to it. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, - * unless configured otherwise by using [[ActorAttributes]]. - */ - def file(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = - new Sink(new FileSink(f, append, DefaultAttributes.fileSink, shape("FileSink"))) - - /** - * Creates a Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - */ - def outputStream(out: () ⇒ OutputStream): Sink[ByteString, Future[Long]] = - new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, shape("OutputStreamSink"))) - - /** - * Creates a Sink which when materialized will return an [[InputStream]] which it is possible - * to read the values produced by the stream this Sink is attached to. - * - * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * @param readTimeout the max time the read operation on the materialized InputStream should block - */ - def inputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] = - Sink.fromGraph(new InputStreamSinkStage(readTimeout)).withAttributes(DefaultAttributes.inputStreamSink) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7d4479dc93..cb648c947d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -425,50 +425,4 @@ object Source { new Source(new AcknowledgeSource(bufferSize, overflowStrategy, DefaultAttributes.acknowledgeSource, shape("AcknowledgeSource"))) } - /** - * Creates a Source from a Files contents. - * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, - * except the final element, which will be up to `chunkSize` in size. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - * - * @param f the File to read from - * @param chunkSize the size of each read operation, defaults to 8192 - */ - def file(f: File, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = - new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, shape("FileSource"))) - - /** - * Creates a Source from an [[InputStream]] created by the given function. - * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, - * except the final element, which will be up to `chunkSize` in size. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - * - * @param in a function which creates the InputStream to read from - * @param chunkSize the size of each read operation, defaults to 8192 - */ - def inputStream(in: () ⇒ InputStream, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = - new Source(new InputStreamSource(in, chunkSize, DefaultAttributes.inputStreamSource, shape("InputStreamSource"))) - - /** - * Creates a Source which when materialized will return an [[OutputStream]] which it is possible - * to write the ByteStrings to the stream this Source is attached to. - * - * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. - * - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds - */ - def outputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] = - Source.fromGraph(new OutputStreamSourceStage(writeTimeout)).withAttributes(DefaultAttributes.outputStreamSource) - } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala new file mode 100644 index 0000000000..32f5c34476 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import java.io.{ OutputStream, InputStream } + +import akka.stream.ActorAttributes +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.io.{ InputStreamSinkStage, OutputStreamSink, OutputStreamSourceStage, InputStreamSource } +import akka.util.ByteString + +import scala.concurrent.Future +import scala.concurrent.duration._ + +/** + * Converters for interacting with the blocking `java.io` streams APIs + */ +object StreamConverters { + + import Source.{ shape ⇒ sourceShape } + import Sink.{ shape ⇒ sinkShape } + + /** + * Creates a Source from an [[InputStream]] created by the given function. + * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, + * except the final element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + * + * @param in a function which creates the InputStream to read from + * @param chunkSize the size of each read operation, defaults to 8192 + */ + def fromInputStream(in: () ⇒ InputStream, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = + new Source(new InputStreamSource(in, chunkSize, DefaultAttributes.inputStreamSource, sourceShape("InputStreamSource"))) + + /** + * Creates a Source which when materialized will return an [[OutputStream]] which it is possible + * to write the ByteStrings to the stream this Source is attached to. + * + * This Source is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param writeTimeout the max time the write operation on the materialized OutputStream should block, defaults to 5 seconds + */ + def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Source[ByteString, OutputStream] = + Source.fromGraph(new OutputStreamSourceStage(writeTimeout)).withAttributes(DefaultAttributes.outputStreamSource) + + /** + * Creates a Sink which writes incoming [[ByteString]]s to an [[OutputStream]] created by the given function. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + */ + def fromOutputStream(out: () ⇒ OutputStream): Sink[ByteString, Future[Long]] = + new Sink(new OutputStreamSink(out, DefaultAttributes.outputStreamSink, sinkShape("OutputStreamSink"))) + + /** + * Creates a Sink which when materialized will return an [[InputStream]] which it is possible + * to read the values produced by the stream this Sink is attached to. + * + * This Sink is intended for inter-operation with legacy APIs since it is inherently blocking. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param readTimeout the max time the read operation on the materialized InputStream should block + */ + def asInputStream(readTimeout: FiniteDuration = 5.seconds): Sink[ByteString, InputStream] = + Sink.fromGraph(new InputStreamSinkStage(readTimeout)).withAttributes(DefaultAttributes.inputStreamSink) + +}