diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala index 100c15de09..6277e0d02c 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala @@ -36,7 +36,7 @@ class FileSourcesBenchmark { val ft = Source(() ⇒ Iterator.continually(line)) .take(10 * 39062) // adjust as needed - .runWith(SynchronousFileSink(f)) + .runWith(Sink.file(f)) Await.result(ft, 30.seconds) f @@ -51,7 +51,7 @@ class FileSourcesBenchmark { @Setup def setup() { - fileChannelSource = SynchronousFileSource(file, bufSize) + fileChannelSource = Source.file(file, bufSize) fileInputStreamSource = InputStreamSource(() ⇒ new FileInputStream(file), bufSize) ioSourceLinesIterator = Source(() ⇒ scala.io.Source.fromFile(file).getLines()).map(ByteString(_)) } diff --git a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java index c37732131a..8200b9e549 100644 --- a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java +++ b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java @@ -6,11 +6,14 @@ import akka.japi.Pair; import akka.japi.function.Function; import akka.stream.*; import akka.stream.javadsl.*; +import akka.util.ByteString; import scala.Option; +import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import scala.concurrent.Promise; import scala.runtime.BoxedUnit; +import java.io.File; import java.util.concurrent.TimeUnit; import java.nio.charset.Charset; @@ -146,6 +149,17 @@ public class MigrationsJava { //#query-param final akka.japi.Option aQueryParam = uri.query().get("a"); //#query-param + + //#file-source-sink + final Source> fileSrc = + Source.file(new File(".")); + + final Source> otherFileSrc = + Source.file(new File("."), 1024); + + final Sink> fileSink = + Sink.file(new File(".")); + //#file-source-sink } } diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index e3ea10c549..4b847422d9 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -420,3 +420,39 @@ And use of query parameters from ``Uri`` that looked like this: should be replaced by: .. includecode:: code/docs/MigrationsJava.java#query-param + +SynchronousFileSource and SynchronousFileSink +============================================ + +Both have been replaced by `Source.file(…)` and `Sink.file(…)` due to discoverability issues +paired with names which leaked internal implementation details. + +Update procedure +---------------- + +Replace `SynchronousFileSource.create(` with `Source.file(` + +Replace `SynchronousFileSink.create(` with `Sink.file(` + +Replace `SynchronousFileSink.appendTo(f)` with `Sink.file(f, true)` + +Example +^^^^^^^ + +:: + + // This no longer works! + final Source> src = + SynchronousFileSource.create(new File(".")); + + // This no longer works! + final Source> src = + SynchronousFileSource.create(new File("."), 1024); + + // This no longer works! + final Sink> sink = + `SynchronousFileSink.appendTo(new File(".")); + +should be replaced by + +.. includecode:: code/docs/MigrationsJava.java#file-source-sink \ No newline at end of file diff --git a/akka-docs-dev/rst/java/stream-io.rst b/akka-docs-dev/rst/java/stream-io.rst index 71f6a8b9c3..7736ddf41d 100644 --- a/akka-docs-dev/rst/java/stream-io.rst +++ b/akka-docs-dev/rst/java/stream-io.rst @@ -110,7 +110,7 @@ on files. Once Akka is free to require JDK8 (from ``2.4.x``) these implementations will be updated to make use of the new NIO APIs (i.e. :class:`AsynchronousFileChannel`). -Streaming data from a file is as easy as defining a `SynchronousFileSource` given a target file, and an optional +Streaming data from a file is as easy as creating a `Source.file` given a target file, and an optional ``chunkSize`` which determines the buffer size determined as one "element" in such stream: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/io/StreamFileDocTest.java#file-source diff --git a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala index 8c973030e2..b97b0f24a8 100644 --- a/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala +++ b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala @@ -1,5 +1,7 @@ package docs +import java.io.File + import akka.http.scaladsl.model.Uri import akka.stream.scaladsl._ import akka.stream._ @@ -208,6 +210,14 @@ class MigrationsScala extends AkkaSpec { //#query-param val param: Option[String] = uri.query().get("a") //#query-param + + //#file-source-sink + val fileSrc = Source.file(new File(".")) + + val otherFileSrc = Source.file(new File("."), 1024) + + val someFileSink = Sink.file(new File(".")) + //#file-source-sink } } } diff --git a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala index 917692f6ad..99aa5cd093 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala @@ -13,7 +13,6 @@ import akka.http.scaladsl.model.headers.{ Server, RawHeader } import akka.http.scaladsl.server.RouteResult.{ Complete, Rejected } import akka.http.scaladsl.server._ import akka.stream.ActorMaterializer -import akka.stream.io.SynchronousFileSource import akka.stream.scaladsl.{ Sink, Source } import akka.util.ByteString @@ -174,7 +173,7 @@ class BasicDirectivesExamplesSpec extends RoutingSpec { path("sample") { complete { // internally uses the configured fileIODispatcher: - val source = SynchronousFileSource(new File("example.json")) + val source = Source.file(new File("example.json")) HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, source)) } } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala index 4979d30e2b..5ddec38017 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala @@ -6,8 +6,6 @@ package docs.stream.io import java.io.File import akka.stream._ -import akka.stream.io.SynchronousFileSink -import akka.stream.io.SynchronousFileSource import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.stream.testkit.Utils._ @@ -48,7 +46,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { //#file-source - val foreach: Future[Long] = SynchronousFileSource(file) + val foreach: Future[Long] = Source.file(file) .to(Sink.ignore) .run() //#file-source @@ -56,16 +54,8 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { "configure dispatcher in code" in { //#custom-dispatcher-code - SynchronousFileSink(file) + Sink.file(file) .withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher")) //#custom-dispatcher-code } - - "show Implicits" in { - //#source-sink-implicits - import akka.stream.io.Implicits._ - - Source.synchronousFile(file) to Sink.outputStream(() ⇒ System.out) - //#source-sink-implicits - } } diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-and-resource-directives/getFromDirectory.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-and-resource-directives/getFromDirectory.rst index dac8ea1816..f5db74de59 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-and-resource-directives/getFromDirectory.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-and-resource-directives/getFromDirectory.rst @@ -24,10 +24,10 @@ To serve files from a classpath directory use :ref:`-getFromResourceDirectory-` Note that it's not required to wrap this directive with ``get`` as this directive will only respond to ``GET`` requests. .. note:: - The file's contents will be read using an Akka Streams :class:`SynchronousFileSource` which *automatically uses + The file's contents will be read using an Akka Streams `Source` which *automatically uses a pre-configured dedicated blocking io dispatcher*, which separates the blocking file operations from the rest of the stream. - Note also that thanks to using Akka Streams internally, the file will be served at the highest spead reachable by + Note also that thanks to using Akka Streams internally, the file will be served at the highest speed reachable by the client, and not faster – i.e. the file will *not* end up being loaded in full into memory before writing it to the client. diff --git a/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-and-resource-directives/getFromFile.rst b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-and-resource-directives/getFromFile.rst index a0c43b4d51..f9734087eb 100644 --- a/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-and-resource-directives/getFromFile.rst +++ b/akka-docs-dev/rst/scala/http/routing-dsl/directives/file-and-resource-directives/getFromFile.rst @@ -24,10 +24,10 @@ To serve files from a classpath directory use :ref:`-getFromResourceDirectory-` Note that it's not required to wrap this directive with ``get`` as this directive will only respond to ``GET`` requests. .. note:: - The file's contents will be read using an Akka Streams :class:`SynchronousFileSource` which *automatically uses + The file's contents will be read using an Akka Streams `Source` which *automatically uses a pre-configured dedicated blocking io dispatcher*, which separates the blocking file operations from the rest of the stream. - Note also that thanks to using Akka Streams internally, the file will be served at the highest spead reachable by + Note also that thanks to using Akka Streams internally, the file will be served at the highest speed reachable by the client, and not faster – i.e. the file will *not* end up being loaded in full into memory before writing it to the client. diff --git a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst index 69e66861ad..f123e8e622 100644 --- a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst @@ -450,3 +450,34 @@ And use of query parameters from ``Uri`` that looked like this: should be replaced by: .. includecode:: code/docs/MigrationsScala.scala#query-param + +SynchronousFileSource and SynchronousFileSink +============================================ + +Both have been replaced by `Source.file(…)` and `Sink.file(…)` due to discoverability issues +paired with names which leaked internal implementation details. + +Update procedure +---------------- + +Replace `SynchronousFileSource(` and `SynchronousFileSource.apply(` with `Source.file(` + +Replace `SynchronousFileSink(` and `SynchronousFileSink.apply(` with `Sink.file(` + +Example +^^^^^^^ + +:: + + // This no longer works! + val fileSrc = SynchronousFileSource(new File(".")) + + // This no longer works! + val otherFileSrc = SynchronousFileSource(new File("."), 1024) + + // This no longer works! + val someFileSink = SynchronousFileSink(new File(".")) + +should be replaced by + +.. includecode:: code/docs/MigrationsScala.scala#file-source-sink diff --git a/akka-docs-dev/rst/scala/stream-io.rst b/akka-docs-dev/rst/scala/stream-io.rst index 1cb32c044a..3693d163f5 100644 --- a/akka-docs-dev/rst/scala/stream-io.rst +++ b/akka-docs-dev/rst/scala/stream-io.rst @@ -110,7 +110,7 @@ on files. Once Akka is free to require JDK8 (from ``2.4.x``) these implementations will be updated to make use of the new NIO APIs (i.e. :class:`AsynchronousFileChannel`). -Streaming data from a file is as easy as defining a `SynchronousFileSource` given a target file, and an optional +Streaming data from a file is as easy as creating a `Source.file` given a target file, and an optional ``chunkSize`` which determines the buffer size determined as one "element" in such stream: .. includecode:: code/docs/stream/io/StreamFileDocSpec.scala#file-source @@ -122,9 +122,3 @@ dispatcher for file IO operations globally, you can do so by changing the ``akka or for a specific stage by specifying a custom Dispatcher in code, like this: .. includecode:: code/docs/stream/io/StreamFileDocSpec.scala#custom-dispatcher-code - -If you would like to keep all sink and source factories defined on the :class:`Source` and :class:`Sink` objects -instead of using the separate objects contained in ``akka.stream.io`` to create these you can import an *implicit -coversion* that makes these operations available as shown below: - -.. includecode:: code/docs/stream/io/StreamFileDocSpec.scala#source-sink-implicits diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index 7d7a9bbdf2..83aea6087c 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -12,7 +12,6 @@ import scala.concurrent.duration._ import scala.collection.immutable import akka.util.ByteString import akka.stream.scaladsl._ -import akka.stream.io.SynchronousFileSource import akka.stream.stage._ import akka.stream._ import akka.{ japi, stream } @@ -120,8 +119,8 @@ sealed trait BodyPartEntity extends HttpEntity with jm.BodyPartEntity { def withContentType(contentType: ContentType): BodyPartEntity /** - * See [[HttpEntity#withSizeLimit]]. - */ + * See [[HttpEntity#withSizeLimit]]. + */ def withSizeLimit(maxBytes: Long): BodyPartEntity } @@ -134,8 +133,8 @@ sealed trait RequestEntity extends HttpEntity with jm.RequestEntity with Respons def withContentType(contentType: ContentType): RequestEntity /** - * See [[HttpEntity#withSizeLimit]]. - */ + * See [[HttpEntity#withSizeLimit]]. + */ def withSizeLimit(maxBytes: Long): RequestEntity def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): RequestEntity @@ -150,8 +149,8 @@ sealed trait ResponseEntity extends HttpEntity with jm.ResponseEntity { def withContentType(contentType: ContentType): ResponseEntity /** - * See [[HttpEntity#withSizeLimit]]. - */ + * See [[HttpEntity#withSizeLimit]]. + */ def withSizeLimit(maxBytes: Long): ResponseEntity def transformDataBytes(transformer: Flow[ByteString, ByteString, Any]): ResponseEntity @@ -161,8 +160,8 @@ sealed trait UniversalEntity extends jm.UniversalEntity with MessageEntity with def withContentType(contentType: ContentType): UniversalEntity /** - * See [[HttpEntity#withSizeLimit]]. - */ + * See [[HttpEntity#withSizeLimit]]. + */ def withSizeLimit(maxBytes: Long): UniversalEntity def contentLength: Long @@ -195,7 +194,7 @@ object HttpEntity { val fileLength = file.length if (fileLength > 0) Default(contentType, fileLength, - if (chunkSize > 0) SynchronousFileSource(file, chunkSize) else SynchronousFileSource(file)) + if (chunkSize > 0) Source.file(file, chunkSize) else Source.file(file)) else empty(contentType) } @@ -233,8 +232,8 @@ object HttpEntity { if (contentType == this.contentType) this else copy(contentType = contentType) /** - * See [[HttpEntity#withSizeLimit]]. - */ + * See [[HttpEntity#withSizeLimit]]. + */ def withSizeLimit(maxBytes: Long): UniversalEntity = if (data.length <= maxBytes) this else Default(contentType, data.length, limitableByteSource(Source.single(data))) withSizeLimit maxBytes @@ -265,8 +264,8 @@ object HttpEntity { if (contentType == this.contentType) this else copy(contentType = contentType) /** - * See [[HttpEntity#withSizeLimit]]. - */ + * See [[HttpEntity#withSizeLimit]]. + */ def withSizeLimit(maxBytes: Long): Default = copy(data = data withAttributes Attributes(SizeLimit(maxBytes, Some(contentLength)))) @@ -287,8 +286,8 @@ object HttpEntity { def dataBytes: Source[ByteString, Any] = data /** - * See [[HttpEntity#withSizeLimit]]. - */ + * See [[HttpEntity#withSizeLimit]]. + */ def withSizeLimit(maxBytes: Long): Self = withData(data withAttributes Attributes(SizeLimit(maxBytes))) diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala index cff907f653..774aac7619 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala @@ -15,7 +15,6 @@ import scala.concurrent.{ Future, ExecutionContext } import scala.collection.immutable import scala.util.{ Failure, Success, Try } import akka.stream.Materializer -import akka.stream.io.SynchronousFileSource import akka.stream.scaladsl.{ Source } import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.model.headers._ @@ -236,7 +235,7 @@ object Multipart { } /** - * Creates a BodyPart backed by a File that will be streamed using a SynchronousFileSource. + * Creates a BodyPart backed by a File that will be streamed using a FileSource. */ def fromFile(name: String, contentType: ContentType, file: File, chunkSize: Int = -1): BodyPart = BodyPart(name, HttpEntity(contentType, file, chunkSize), Map("filename" -> file.getName)) diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala index 5e3b23f36b..d0663088a5 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileAndResourceDirectives.scala @@ -9,7 +9,8 @@ import java.io.File import java.net.{ URI, URL } import akka.stream.ActorAttributes -import akka.stream.io.{ InputStreamSource, SynchronousFileSource } +import akka.stream.io.{ InputStreamSource } +import akka.stream.scaladsl.Source import scala.annotation.tailrec import akka.actor.ActorSystem @@ -54,7 +55,7 @@ trait FileAndResourceDirectives { extractSettings { settings ⇒ complete { HttpEntity.Default(contentType, file.length, - SynchronousFileSource(file) + Source.file(file) .withAttributes(ActorAttributes.dispatcher(settings.fileIODispatcher))) } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala similarity index 84% rename from akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala rename to akka-stream-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala index 2fa3b857e8..f8dd790fb0 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SynchronousFilePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala @@ -7,8 +7,7 @@ import java.io.{ File, FileWriter } import akka.actor.ActorSystem import akka.event.Logging -import akka.stream.io.SynchronousFileSource -import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.{ Source, Sink } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.testkit.{ EventFilter, TestEvent } @@ -16,7 +15,7 @@ import akka.util.ByteString import org.reactivestreams.Publisher import org.testng.annotations.{ AfterClass, BeforeClass } -class SynchronousFilePublisherTest extends AkkaPublisherVerification[ByteString] { +class FilePublisherTest extends AkkaPublisherVerification[ByteString] { val ChunkSize = 256 val Elements = 1000 @@ -37,7 +36,7 @@ class SynchronousFilePublisherTest extends AkkaPublisherVerification[ByteString] } def createPublisher(elements: Long): Publisher[ByteString] = - SynchronousFileSource(file, chunkSize = 512) + Source.file(file, chunkSize = 512) .take(elements) .runWith(Sink.publisher(false)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala similarity index 92% rename from akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 18e6aad9d0..eec2c67ff6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -9,7 +9,7 @@ import akka.actor.ActorSystem import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ Sink, Source } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.StreamTestKit @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.Await import scala.concurrent.duration._ -class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { +class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") implicit val materializer = ActorMaterializer(settings) @@ -45,7 +45,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "write lines to a file" in assertAllStagesStopped { targetFile { f ⇒ val completion = Source(TestByteStrings) - .runWith(SynchronousFileSink(f)) + .runWith(Sink.file(f)) val size = Await.result(completion, 3.seconds) size should equal(6006) @@ -58,7 +58,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { def write(lines: List[String]) = Source(lines) .map(ByteString(_)) - .runWith(SynchronousFileSink(f)) + .runWith(Sink.file(f)) val completion1 = write(TestLines) Await.result(completion1, 3.seconds) @@ -77,7 +77,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { def write(lines: List[String] = TestLines) = Source(lines) .map(ByteString(_)) - .runWith(SynchronousFileSink(f, append = true)) + .runWith(Sink.file(f, append = true)) val completion1 = write() val written1 = Await.result(completion1, 3.seconds) @@ -98,10 +98,10 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(3.seconds) try { - Source(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(SynchronousFileSink(f))(mat) + Source(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(Sink.file(f))(mat) mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get + val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") } finally shutdown(sys) } @@ -117,7 +117,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { try { Source(() ⇒ Iterator.continually(TestByteStrings.head)) - .to(SynchronousFileSink(f)) + .to(Sink.file(f)) .withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")) .run()(mat) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala similarity index 90% rename from akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index e2ff970263..ce9feefb9b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -15,8 +15,8 @@ import akka.stream.Attributes import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children -import akka.stream.io.SynchronousFileSourceSpec.Settings -import akka.stream.scaladsl.Sink +import akka.stream.io.FileSourceSpec.Settings +import akka.stream.scaladsl.{ Source, Sink } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink @@ -26,11 +26,11 @@ import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ -object SynchronousFileSourceSpec { +object FileSourceSpec { final case class Settings(chunkSize: Int, readAhead: Int) } -class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { +class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") implicit val materializer = ActorMaterializer(settings) @@ -74,7 +74,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val chunkSize = 512 val bufferAttributes = Attributes.inputBuffer(1, 2) - val p = SynchronousFileSource(testFile, chunkSize) + val p = Source.file(testFile, chunkSize) .withAttributes(bufferAttributes) .runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[ByteString]() @@ -111,7 +111,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val demandAllButOneChunks = TestText.length / chunkSize - 1 - val p = SynchronousFileSource(testFile, chunkSize) + val p = Source.file(testFile, chunkSize) .withAttributes(bufferAttributes) .runWith(Sink.publisher(false)) @@ -140,7 +140,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "onError whent trying to read from file which does not exist" in assertAllStagesStopped { - val p = SynchronousFileSource(notExistingFile).runWith(Sink.publisher(false)) + val p = Source.file(notExistingFile).runWith(Sink.publisher(false)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) @@ -156,7 +156,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { import settings._ s"count lines in real file (chunkSize = $chunkSize, readAhead = $readAhead)" in { - val s = SynchronousFileSource(manyLines, chunkSize = chunkSize) + val s = Source.file(manyLines, chunkSize = chunkSize) .withAttributes(Attributes.inputBuffer(readAhead, readAhead)) val f = s.runWith(Sink.fold(0) { case (acc, l) ⇒ acc + l.utf8String.count(_ == '\n') }) @@ -172,10 +172,10 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(500.millis) try { - val p = SynchronousFileSource(manyLines).runWith(TestSink.probe)(mat) + val p = Source.file(manyLines).runWith(TestSink.probe)(mat) mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get + val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel() } finally shutdown(sys) } @@ -188,7 +188,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(500.millis) try { - val p = SynchronousFileSource(manyLines) + val p = Source.file(manyLines) .withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")) .runWith(TestSink.probe)(mat) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 6a0e9d9fcc..06b5832f84 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -23,6 +23,8 @@ import scala.collection.immutable private[stream] object Stages { object DefaultAttributes { + val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") + val timerTransform = name("timerTransform") val stageFactory = name("stageFactory") val fused = name("fused") @@ -79,10 +81,10 @@ private[stream] object Stages { val subscriberSource = name("subscriberSource") val actorPublisherSource = name("actorPublisherSource") val actorRefSource = name("actorRefSource") - val synchronousFileSource = name("synchronousFileSource") val inputStreamSource = name("inputStreamSource") val acknowledgeSource = name("acknowledgeSource") val outputStreamSource = name("outputStreamSource") + val fileSource = name("fileSource") and IODispatcher val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") @@ -92,10 +94,10 @@ private[stream] object Stages { val ignoreSink = name("ignoreSink") val actorRefSink = name("actorRefSink") val actorSubscriberSink = name("actorSubscriberSink") - val synchronousFileSink = name("synchronousFileSink") val outputStreamSink = name("outputStreamSink") val acknowledgeSink = name("acknowledgeSink") val inputStreamSink = name("inputStreamSink") + val fileSink = name("fileSource") and IODispatcher } import DefaultAttributes._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala similarity index 91% rename from akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFilePublisher.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index 2ebb07bcde..993d73c9a5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -16,13 +16,13 @@ import scala.concurrent.Promise import scala.util.control.NonFatal /** INTERNAL API */ -private[akka] object SynchronousFilePublisher { +private[akka] object FilePublisher { def props(f: File, completionPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = { require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") - Props(classOf[SynchronousFilePublisher], f, completionPromise, chunkSize, initialBuffer, maxBuffer) + Props(classOf[FilePublisher], f, completionPromise, chunkSize, initialBuffer, maxBuffer) .withDeploy(Deploy.local) } @@ -31,9 +31,9 @@ private[akka] object SynchronousFilePublisher { } /** INTERNAL API */ -private[akka] final class SynchronousFilePublisher(f: File, bytesReadPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) +private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { - import SynchronousFilePublisher._ + import FilePublisher._ var eofReachedAtOffset = Long.MinValue diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala similarity index 84% rename from akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFileSubscriber.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 94f1d8c0cd..b282e524e3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SynchronousFileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -13,16 +13,16 @@ import akka.util.ByteString import scala.concurrent.Promise /** INTERNAL API */ -private[akka] object SynchronousFileSubscriber { +private[akka] object FileSubscriber { def props(f: File, completionPromise: Promise[Long], bufSize: Int, append: Boolean) = { require(bufSize > 0, "buffer size must be > 0") - Props(classOf[SynchronousFileSubscriber], f, completionPromise, bufSize, append).withDeploy(Deploy.local) + Props(classOf[FileSubscriber], f, completionPromise, bufSize, append).withDeploy(Deploy.local) } } /** INTERNAL API */ -private[akka] class SynchronousFileSubscriber(f: File, bytesWrittenPromise: Promise[Long], bufSize: Int, append: Boolean) +private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long], bufSize: Int, append: Boolean) extends akka.stream.actor.ActorSubscriber with ActorLogging { @@ -58,7 +58,7 @@ private[akka] class SynchronousFileSubscriber(f: File, bytesWrittenPromise: Prom } case ActorSubscriberMessage.OnError(cause) ⇒ - log.error(cause, "Tearing down SynchronousFileSink({}) due to upstream error", f.getAbsolutePath) + log.error(cause, "Tearing down FileSink({}) due to upstream error", f.getAbsolutePath) context.stop(self) case ActorSubscriberMessage.OnComplete ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala deleted file mode 100644 index 51dedd0622..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala +++ /dev/null @@ -1,12 +0,0 @@ -package akka.stream.impl.io - -import akka.stream.ActorAttributes -import akka.stream.Attributes - -private[stream] object IOSettings { - - final val SyncFileSourceDefaultChunkSize = 8192 - final val SyncFileSourceName = Attributes.name("synchronousFileSource") - final val SyncFileSinkName = Attributes.name("synchronousFileSink") - final val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index 313d552619..787debe431 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -6,6 +6,7 @@ package akka.stream.impl.io import java.io.{ File, OutputStream } import akka.stream.impl.SinkModule import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.{ ActorMaterializer, MaterializationContext, Attributes, SinkShape } import akka.stream.ActorAttributes.Dispatcher import akka.util.ByteString @@ -16,7 +17,7 @@ import scala.concurrent.{ Future, Promise } * Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file * (creating it before hand if necessary). */ -private[akka] final class SynchronousFileSink(f: File, append: Boolean, val attributes: Attributes, shape: SinkShape[ByteString]) +private[akka] final class FileSink(f: File, append: Boolean, val attributes: Attributes, shape: SinkShape[ByteString]) extends SinkModule[ByteString, Future[Long]](shape) { override def create(context: MaterializationContext) = { @@ -24,18 +25,18 @@ private[akka] final class SynchronousFileSink(f: File, append: Boolean, val attr val settings = mat.effectiveSettings(context.effectiveAttributes) val bytesWrittenPromise = Promise[Long]() - val props = SynchronousFileSubscriber.props(f, bytesWrittenPromise, settings.maxInputBufferSize, append) - val dispatcher = context.effectiveAttributes.get[Dispatcher](IOSettings.IODispatcher).dispatcher + val props = FileSubscriber.props(f, bytesWrittenPromise, settings.maxInputBufferSize, append) + val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher val ref = mat.actorOf(context, props.withDispatcher(dispatcher)) (akka.stream.actor.ActorSubscriber[ByteString](ref), bytesWrittenPromise.future) } override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[Long]] = - new SynchronousFileSink(f, append, attributes, shape) + new FileSink(f, append, attributes, shape) override def withAttributes(attr: Attributes): Module = - new SynchronousFileSink(f, append, attr, amendShape(attr)) + new FileSink(f, append, attr, amendShape(attr)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index eef2ac9e16..759160709d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -3,38 +3,32 @@ */ package akka.stream.impl.io -import java.io.{ File, IOException, InputStream, OutputStream } -import java.lang.{ Long ⇒ JLong } -import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue } +import java.io.{ File, InputStream } -import akka.actor.{ ActorRef, Deploy } -import akka.japi import akka.stream._ import akka.stream.ActorAttributes.Dispatcher import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.impl.{ ErrorPublisher, SourceModule } -import akka.stream.scaladsl.{ Source, FlowGraph } -import akka.util.{ ByteString, Timeout } +import akka.util.ByteString import org.reactivestreams._ - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Await, Future, Promise } -import scala.util.control.NonFatal +import scala.concurrent.{ Future, Promise } /** * INTERNAL API * Creates simple synchronous (Java 6 compatible) Source backed by the given file. */ -private[akka] final class SynchronousFileSource(f: File, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) +private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) extends SourceModule[ByteString, Future[Long]](shape) { + require(chunkSize > 0, "chunkSize must be greater than 0") override def create(context: MaterializationContext) = { // FIXME rewrite to be based on GraphStage rather than dangerous downcasts val mat = ActorMaterializer.downcast(context.materializer) val settings = mat.effectiveSettings(context.effectiveAttributes) val bytesReadPromise = Promise[Long]() - val props = SynchronousFilePublisher.props(f, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) - val dispatcher = context.effectiveAttributes.get[Dispatcher](IOSettings.IODispatcher).dispatcher + val props = FilePublisher.props(f, bytesReadPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) + val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher val ref = mat.actorOf(context, props.withDispatcher(dispatcher)) @@ -42,10 +36,10 @@ private[akka] final class SynchronousFileSource(f: File, chunkSize: Int, val att } override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[Long]] = - new SynchronousFileSource(f, chunkSize, attributes, shape) + new FileSource(f, chunkSize, attributes, shape) override def withAttributes(attr: Attributes): Module = - new SynchronousFileSource(f, chunkSize, attr, amendShape(attr)) + new FileSource(f, chunkSize, attr, amendShape(attr)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index f7ec56d08a..5c5e58fabb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -33,7 +33,7 @@ private[akka] class InputStreamPublisher(is: InputStream, bytesReadPromise: Prom extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { - // TODO possibly de-duplicate with SynchronousFilePublisher? + // TODO possibly de-duplicate with FilePublisher? import InputStreamPublisher._ diff --git a/akka-stream/src/main/scala/akka/stream/io/Implicits.scala b/akka-stream/src/main/scala/akka/stream/io/Implicits.scala index dd5d313a8a..9e92f5fc62 100644 --- a/akka-stream/src/main/scala/akka/stream/io/Implicits.scala +++ b/akka-stream/src/main/scala/akka/stream/io/Implicits.scala @@ -18,7 +18,7 @@ import scala.language.implicitConversions * import akka.stream.io._ * * // explicitly using IO Source: - * SynchronousFileSource(file).map(...) + * FileSource(file).map(...) * * // using implicit conversion: * import akka.stream.io.Implicits._ @@ -29,20 +29,12 @@ object Implicits { // ---- Sources ---- - implicit final class AddSynchronousFileSource(val s: Source.type) extends AnyVal { - def synchronousFile: SynchronousFileSource.type = SynchronousFileSource - } - implicit final class AddInputStreamSource(val s: Source.type) extends AnyVal { def inputStream: InputStreamSource.type = InputStreamSource } // ---- Sinks ---- - implicit final class AddSynchronousFileSink(val s: Sink.type) extends AnyVal { - def synchronousFile: SynchronousFileSink.type = SynchronousFileSink - } - implicit final class AddOutputStreamSink(val s: Sink.type) extends AnyVal { def outputStream: OutputStreamSink.type = OutputStreamSink } diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala deleted file mode 100644 index 797348c3b8..0000000000 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSink.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2014-2015 Typesafe Inc. - */ -package akka.stream.io - -import java.io.File - -import akka.stream.{ Attributes, javadsl, ActorAttributes } -import akka.stream.scaladsl.Sink -import akka.util.ByteString - -import scala.concurrent.Future - -/** - * Sink which writes incoming [[ByteString]]s to the given file - */ -object SynchronousFileSink { - import akka.stream.impl.io.IOSettings._ - import akka.stream.impl.io.SynchronousFileSink - - final val DefaultAttributes = SyncFileSinkName and IODispatcher - - /** - * Synchronous (Java 6 compatible) Sink that writes incoming [[ByteString]] elements to the given file. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, - * unless configured otherwise by using [[ActorAttributes]]. - */ - def apply(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = - new Sink(new SynchronousFileSink(f, append, DefaultAttributes, Sink.shape("SynchronousFileSink"))) - - /** - * Java API - * - * Synchronous (Java 6 compatible) Sink that writes incoming [[ByteString]] elements to the given file. - * Overwrites existing files, if you want to append to an existing file use [[#create(File, Boolean)]] instead. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, - * unless configured otherwise by using [[ActorAttributes]]. - */ - def create(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = - apply(f, append = false).asJava.asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] - - /** - * Java API - * - * Synchronous (Java 6 compatible) Sink that writes incoming [[ByteString]] elements to the given file. - * - * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. - * - * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, - * unless configured otherwise by using [[ActorAttributes]]. - */ - def appendTo(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = - apply(f, append = true).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] - -} diff --git a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala b/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala deleted file mode 100644 index 6ad5d763b8..0000000000 --- a/akka-stream/src/main/scala/akka/stream/io/SynchronousFileSource.scala +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.io - -import java.io.File -import akka.stream.scaladsl.Source -import akka.stream.{ ActorAttributes, Attributes, javadsl } -import akka.util.ByteString -import scala.concurrent.Future - -object SynchronousFileSource { - import akka.stream.impl.io.IOSettings._ - import akka.stream.impl.io.SynchronousFileSource - - final val DefaultAttributes = SyncFileSourceName and IODispatcher - - /** - * Creates a synchronous (Java 6 compatible) Source from a Files contents. - * Emitted elements are `chunkSize` sized [[ByteString]] elements. - * - * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def apply(f: File, chunkSize: Int = SyncFileSourceDefaultChunkSize): Source[ByteString, Future[Long]] = - new Source(new SynchronousFileSource(f, chunkSize, DefaultAttributes, Source.shape("SynchronousFileSource")).nest()) // TO DISCUSS: I had to add wrap() here to make the name available - - /** - * Creates a synchronous (Java 6 compatible) Source from a Files contents. - * Emitted elements are [[ByteString]] elements, chunked by default by [[DefaultChunkSize]] bytes. - * - * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def create(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = create(f, SyncFileSourceDefaultChunkSize) - - /** - * Creates a synchronous (Java 6 compatible) Source from a Files contents. - * Emitted elements are `chunkSize` sized [[ByteString]] elements. - * - * This source is backed by an Actor which will use the dedicated thread-pool base dispatcher. - * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. - * - * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. - */ - def create(f: File, chunkSize: Int): javadsl.Source[ByteString, Future[java.lang.Long]] = - apply(f, chunkSize).asJava.asInstanceOf[javadsl.Source[ByteString, Future[java.lang.Long]]] -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 7298f87d77..cc9ea47723 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -3,10 +3,13 @@ */ package akka.stream.javadsl +import java.io.File + import akka.actor.{ ActorRef, Props } import akka.japi.function import akka.stream.impl.StreamLayout import akka.stream.{ javadsl, scaladsl, _ } +import akka.util.ByteString import org.reactivestreams.{ Publisher, Subscriber } import scala.concurrent.duration.FiniteDuration @@ -154,6 +157,30 @@ object Sink { def queue[T](bufferSize: Int, timeout: FiniteDuration): Sink[T, SinkQueue[T]] = new Sink(scaladsl.Sink.queue(bufferSize, timeout)) + /** + * Creates a Sink that writes incoming [[ByteString]] elements to the given file. + * Overwrites existing files, if you want to append to an existing file use [[#file(File, Boolean)]] and + * pass in `true` as the Boolean argument. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + */ + def file(f: File): javadsl.Sink[ByteString, Future[java.lang.Long]] = file(f, append = false) + + /** + * Creates a Sink that writes incoming [[ByteString]] elements to the given file and either overwrites + * or appends to it. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + */ + def file(f: File, append: Boolean): javadsl.Sink[ByteString, Future[java.lang.Long]] = + new Sink(scaladsl.Sink.file(f, append)).asInstanceOf[javadsl.Sink[ByteString, Future[java.lang.Long]]] + } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 95a7829146..36786101a3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3,12 +3,15 @@ */ package akka.stream.javadsl +import java.io.File + import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } import akka.stream._ import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.stage.Stage +import akka.util.ByteString import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance @@ -234,6 +237,31 @@ object Source { */ def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration): Source[T, SourceQueue[T]] = new Source(scaladsl.Source.queue(bufferSize, overflowStrategy, timeout)) + + /** + * Creates a Source from a Files contents. + * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, + * except the last element, which will be up to 8192 in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def file(f: File): javadsl.Source[ByteString, Future[java.lang.Long]] = file(f, 8192) + + /** + * Creates a synchronous (Java 6 compatible) Source from a Files contents. + * Emitted elements are `chunkSize` sized [[ByteString]] elements, + * except the last element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def file(f: File, chunkSize: Int): Source[ByteString, Future[java.lang.Long]] = + new Source(scaladsl.Source.file(f, chunkSize)).asInstanceOf[Source[ByteString, Future[java.lang.Long]]] } /** @@ -1022,5 +1050,4 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap */ def log(name: String): javadsl.Source[Out, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) - } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index ef8a82e4b6..9900a85bc5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -3,13 +3,17 @@ */ package akka.stream.scaladsl +import java.io.File + import akka.actor.{ ActorRef, Props } import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ +import akka.stream.impl.io.FileSink import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective } import akka.stream.{ javadsl, _ } +import akka.util.ByteString import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec @@ -218,10 +222,22 @@ object Sink { * upstream and then stop back pressure. * * @param bufferSize The size of the buffer in element count - * @param timeout Timeout for ``SinkQueue.pull():Future[Option[T] ]`` + * @param timeout Timeout for ``SinkQueue.pull():Future[Option[T]]`` */ def queue[T](bufferSize: Int, timeout: FiniteDuration = 5.seconds): Sink[T, SinkQueue[T]] = { require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") new Sink(new AcknowledgeSink(bufferSize, DefaultAttributes.acknowledgeSink, shape("AcknowledgeSink"), timeout)) } + + /** + * Creates a Sink which writes incoming [[ByteString]] elements to the given file and either overwrites + * or appends to it. + * + * Materializes a [[Future]] that will be completed with the size of the file (in bytes) at the streams completion. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, + * unless configured otherwise by using [[ActorAttributes]]. + */ + def file(f: File, append: Boolean = false): Sink[ByteString, Future[Long]] = + new Sink(new FileSink(f, append, DefaultAttributes.fileSink, shape("FileSink"))) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 0de8b3b0e1..1856dcb567 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,13 +3,17 @@ */ package akka.stream.scaladsl +import java.io.File + import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.{ DefaultAttributes, StageModule } import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.GraphStages.TickSource +import akka.stream.impl.io.FileSource import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ } import akka.stream.{ Outlet, SourceShape, _ } +import akka.util.ByteString import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec @@ -369,4 +373,17 @@ object Source { new Source(new AcknowledgeSource(bufferSize, overflowStrategy, DefaultAttributes.acknowledgeSource, shape("AcknowledgeSource"))) } + /** + * Creates a Source from a Files contents. + * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, + * except the final element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] containing the number of bytes read from the source file upon completion. + */ + def file(f: File, chunkSize: Int = 8192): Source[ByteString, Future[Long]] = + new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, shape("FileSource"))) + }