diff --git a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala index 501adbb1ae..cba982d040 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala @@ -4,7 +4,7 @@ package akka.stream.io -import java.io.{ FileInputStream, File } +import java.nio.file.{ Files, Path } import java.util.concurrent.TimeUnit import akka.{ Done, NotUsed } import akka.actor.ActorSystem @@ -28,15 +28,14 @@ class FileSourcesBenchmark { implicit val system = ActorSystem("file-sources-benchmark") implicit val materializer = ActorMaterializer() - val file: File = { + val file: Path = { val line = ByteString("x" * 2048 + "\n") - val f = File.createTempFile(getClass.getName, ".bench.tmp") - f.deleteOnExit() + val f = Files.createTempFile(getClass.getName, ".bench.tmp") val ft = Source.fromIterator(() ⇒ Iterator.continually(line)) .take(10 * 39062) // adjust as needed - .runWith(FileIO.toFile(f)) + .runWith(FileIO.toPath(f)) Await.result(ft, 30.seconds) f @@ -51,14 +50,14 @@ class FileSourcesBenchmark { @Setup def setup():Unit = { - fileChannelSource = FileIO.fromFile(file, bufSize) - fileInputStreamSource = StreamConverters.fromInputStream(() ⇒ new FileInputStream(file), bufSize) - ioSourceLinesIterator = Source.fromIterator(() ⇒ scala.io.Source.fromFile(file).getLines()).map(ByteString(_)) + fileChannelSource = FileIO.fromPath(file, bufSize) + fileInputStreamSource = StreamConverters.fromInputStream(() ⇒ Files.newInputStream(file), bufSize) + ioSourceLinesIterator = Source.fromIterator(() ⇒ scala.io.Source.fromFile(file.toFile).getLines()).map(ByteString(_)) } @TearDown def teardown(): Unit = { - file.delete() + Files.delete(file) } @TearDown diff --git a/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java b/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java index d2bbc53d3b..c738a9439c 100644 --- a/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/QuickStartDocTest.java @@ -3,7 +3,7 @@ */ package docs.stream; -import java.io.File; +import java.nio.file.Paths; import java.math.BigInteger; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -56,7 +56,7 @@ public class QuickStartDocTest { final CompletionStage result = factorials .map(num -> ByteString.fromString(num.toString() + "\n")) - .runWith(FileIO.toFile(new File("factorials.txt")), materializer); + .runWith(FileIO.toPath(Paths.get("factorials.txt")), materializer); //#transform-source //#use-transformed-sink @@ -81,7 +81,7 @@ public class QuickStartDocTest { public Sink> lineSink(String filename) { return Flow.of(String.class) .map(s -> ByteString.fromString(s.toString() + "\n")) - .toMat(FileIO.toFile(new File(filename)), Keep.right()); + .toMat(FileIO.toPath(Paths.get(filename)), Keep.right()); } //#transform-sink diff --git a/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java b/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java index 00eac15585..ba1f633625 100644 --- a/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java @@ -3,7 +3,9 @@ */ package docs.stream.io; -import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.io.IOException; import java.util.concurrent.CompletionStage; @@ -47,13 +49,13 @@ public class StreamFileDocTest extends AbstractJavaTest { { //#file-source - final File file = new File("example.csv"); + final Path file = Paths.get("example.csv"); //#file-source } @Test public void demonstrateMaterializingBytesWritten() throws IOException { - final File file = File.createTempFile(getClass().getName(), ".tmp"); + final Path file = Files.createTempFile(getClass().getName(), ".tmp"); try { //#file-source @@ -61,27 +63,27 @@ public class StreamFileDocTest extends AbstractJavaTest { Sink. foreach(chunk -> System.out.println(chunk.utf8String())); CompletionStage ioResult = - FileIO.fromFile(file) + FileIO.fromPath(file) .to(printlnSink) .run(mat); //#file-source } finally { - file.delete(); + Files.delete(file); } } @Test public void demonstrateSettingDispatchersInCode() throws IOException { - final File file = File.createTempFile(getClass().getName(), ".tmp"); + final Path file = Files.createTempFile(getClass().getName(), ".tmp"); try { Sink> fileSink = //#custom-dispatcher-code - FileIO.toFile(file) + FileIO.toPath(file) .withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher")); //#custom-dispatcher-code } finally { - file.delete(); + Files.delete(file); } } diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala index 4818717a64..656387967c 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/BasicDirectivesExamplesSpec.scala @@ -4,7 +4,7 @@ package docs.http.scaladsl.server.directives -import java.io.File +import java.nio.file.Paths import akka.actor.ActorSystem import akka.event.Logging @@ -190,7 +190,7 @@ class BasicDirectivesExamplesSpec extends RoutingSpec { path("sample") { complete { // internally uses the configured fileIODispatcher: - val source = FileIO.fromFile(new File("example.json")) + val source = FileIO.fromPath(Paths.get("example.json")) HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, source)) } } diff --git a/akka-docs/rst/scala/code/docs/stream/QuickStartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/QuickStartDocSpec.scala index 7a47f0c8a0..bf25549841 100644 --- a/akka-docs/rst/scala/code/docs/stream/QuickStartDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/QuickStartDocSpec.scala @@ -15,7 +15,7 @@ import org.scalatest._ import org.scalatest.concurrent._ import scala.concurrent._ import scala.concurrent.duration._ -import java.io.File +import java.nio.file.Paths class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures { implicit val patience = PatienceConfig(5.seconds) @@ -46,7 +46,7 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture val result: Future[IOResult] = factorials .map(num => ByteString(s"$num\n")) - .runWith(FileIO.toFile(new File("factorials.txt"))) + .runWith(FileIO.toPath(Paths.get("factorials.txt"))) //#transform-source //#use-transformed-sink @@ -71,7 +71,7 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture def lineSink(filename: String): Sink[String, Future[IOResult]] = Flow[String] .map(s => ByteString(s + "\n")) - .toMat(FileIO.toFile(new File(filename)))(Keep.right) + .toMat(FileIO.toPath(Paths.get(filename)))(Keep.right) //#transform-sink } diff --git a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index cd90d71466..95455e4599 100644 --- a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -31,9 +31,11 @@ object TwitterStreamQuickstartDocSpec { val akka = Hashtag("#akka") //#model + // format: OFF //#tweet-source val tweets: Source[Tweet, NotUsed] //#tweet-source + // format: ON = Source( Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") :: Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") :: diff --git a/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala index b66e58ed63..14fda3fee2 100644 --- a/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/io/StreamFileDocSpec.scala @@ -3,10 +3,10 @@ */ package docs.stream.io -import java.io.File +import java.nio.file.{ Files, Paths } import akka.stream._ -import akka.stream.scaladsl.{ FileIO, Sink, Source } +import akka.stream.scaladsl.{ FileIO, Sink } import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.util.ByteString @@ -22,9 +22,9 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { // silence sysout def println(s: String) = () - val file = File.createTempFile(getClass.getName, ".tmp") + val file = Files.createTempFile(getClass.getName, ".tmp") - override def afterTermination() = file.delete() + override def afterTermination() = Files.delete(file) { //#file-source @@ -35,7 +35,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { { //#file-source - val file = new File("example.csv") + val file = Paths.get("example.csv") //#file-source } @@ -46,7 +46,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { //#file-source - val foreach: Future[IOResult] = FileIO.fromFile(file) + val foreach: Future[IOResult] = FileIO.fromPath(file) .to(Sink.ignore) .run() //#file-source @@ -54,7 +54,7 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) { "configure dispatcher in code" in { //#custom-dispatcher-code - FileIO.fromFile(file) + FileIO.fromPath(file) .withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher")) //#custom-dispatcher-code } diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntities.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntities.java index 37b3f42abe..ff221c51ac 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntities.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpEntities.java @@ -5,6 +5,7 @@ package akka.http.javadsl.model; import java.io.File; +import java.nio.file.Path; import akka.http.impl.util.JavaAccessors; import akka.http.scaladsl.model.HttpEntity; @@ -42,14 +43,30 @@ public final class HttpEntities { return HttpEntity$.MODULE$.apply((akka.http.scaladsl.model.ContentType) contentType, bytes); } + /** + * @deprecated Will be removed in Akka 3.x, use {@link #create(ContentType, Path)} instead. + */ + @Deprecated public static UniversalEntity create(ContentType contentType, File file) { return JavaAccessors.HttpEntity(contentType, file); } + public static UniversalEntity create(ContentType contentType, Path file) { + return JavaAccessors.HttpEntity(contentType, file); + } + + /** + * @deprecated Will be removed in Akka 3.x, use {@link #create(ContentType, Path, int)} instead. + */ + @Deprecated public static UniversalEntity create(ContentType contentType, File file, int chunkSize) { return HttpEntity$.MODULE$.apply((akka.http.scaladsl.model.ContentType) contentType, file, chunkSize); } + public static UniversalEntity create(ContentType contentType, Path file, int chunkSize) { + return HttpEntity$.MODULE$.fromPath((akka.http.scaladsl.model.ContentType) contentType, file, chunkSize); + } + public static HttpEntity.Default create(ContentType contentType, long contentLength, Source data) { return new akka.http.scaladsl.model.HttpEntity.Default((akka.http.scaladsl.model.ContentType) contentType, contentLength, data.asScala()); } diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java index a3032f814c..71b873ce81 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/HttpMessage.java @@ -5,12 +5,17 @@ package akka.http.javadsl.model; import akka.util.ByteString; - import java.io.File; +import java.nio.file.Path; import java.util.Optional; /** * The base type for an Http message (request or response). + * + * INTERNAL API: this trait will be changed in binary-incompatible ways for classes that are derived from it! + * Do not implement this interface outside the Akka code base! + * + * Binary compatibility is only maintained for callers of this trait’s interface. */ public interface HttpMessage { /** @@ -103,9 +108,17 @@ public interface HttpMessage { /** * Returns a copy of Self message with a new entity. + * + * @deprecated Will be removed in Akka 3.x, use {@link #withEntity(ContentType, Path)} instead. */ + @Deprecated Self withEntity(ContentType type, File file); + /** + * Returns a copy of Self message with a new entity. + */ + Self withEntity(ContentType type, Path file); + /** * Returns a copy of Self message with a new entity. */ diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/JavaAccessors.scala b/akka-http-core/src/main/scala/akka/http/impl/util/JavaAccessors.scala index 526b3df545..28dd842c3a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/JavaAccessors.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/JavaAccessors.scala @@ -5,6 +5,7 @@ package akka.http.impl.util import java.io.File +import java.nio.file.Path import JavaMapping.Implicits._ import akka.http.javadsl.model._ @@ -28,4 +29,8 @@ object JavaAccessors { /** INTERNAL API */ def HttpEntity(contentType: ContentType, file: File): UniversalEntity = model.HttpEntity(contentType.asScala, file) + + /** INTERNAL API */ + def HttpEntity(contentType: ContentType, file: Path): UniversalEntity = + model.HttpEntity.fromPath(contentType.asScala, file) } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index dce014a8ae..dad23d5d42 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -10,12 +10,13 @@ import akka.http.impl.model.JavaInitialization import language.implicitConversions import java.io.File +import java.nio.file.{ Path, Files } import java.lang.{ Iterable ⇒ JIterable} import scala.util.control.NonFatal import scala.concurrent.Future import scala.concurrent.duration._ import scala.collection.immutable -import akka.util.{Unsafe, ByteString} +import akka.util.ByteString import akka.stream.scaladsl._ import akka.stream.stage._ import akka.stream._ @@ -242,11 +243,22 @@ object HttpEntity { * * If the given `chunkSize` is -1 the default chunk size is used. */ - def apply(contentType: ContentType, file: File, chunkSize: Int = -1): UniversalEntity = { - val fileLength = file.length + @deprecated("Use `fromPath` instead", "2.4.5") + def apply(contentType: ContentType, file: File, chunkSize: Int = -1): UniversalEntity = + fromPath(contentType, file.toPath, chunkSize) + + /** + * Returns either the empty entity, if the given file is empty, or a [[HttpEntity.Default]] entity + * consisting of a stream of [[akka.util.ByteString]] instances each containing `chunkSize` bytes + * (except for the final ByteString, which simply contains the remaining bytes). + * + * If the given `chunkSize` is -1 the default chunk size is used. + */ + def fromPath(contentType: ContentType, file: Path, chunkSize: Int = -1): UniversalEntity = { + val fileLength = Files.size(file) if (fileLength > 0) HttpEntity.Default(contentType, fileLength, - if (chunkSize > 0) FileIO.fromFile(file, chunkSize) else FileIO.fromFile(file)) + if (chunkSize > 0) FileIO.fromPath(file, chunkSize) else FileIO.fromPath(file)) else empty(contentType) } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala index 01dcb3be99..37cf083fe6 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpMessage.scala @@ -4,6 +4,8 @@ package akka.http.scaladsl.model +import java.io.File +import java.nio.file.Path import java.lang.{ Iterable ⇒ JIterable } import java.util.Optional @@ -107,7 +109,10 @@ sealed trait HttpMessage extends jm.HttpMessage { withEntity(HttpEntity(contentType.asInstanceOf[ContentType.NonBinary], string)) def withEntity(contentType: jm.ContentType, bytes: Array[Byte]): Self = withEntity(HttpEntity(contentType.asInstanceOf[ContentType], bytes)) def withEntity(contentType: jm.ContentType, bytes: ByteString): Self = withEntity(HttpEntity(contentType.asInstanceOf[ContentType], bytes)) - def withEntity(contentType: jm.ContentType, file: java.io.File): Self = withEntity(HttpEntity(contentType.asInstanceOf[ContentType], file)) + + @deprecated("Use withEntity(ContentType, Path) instead", "2.4.5") + def withEntity(contentType: jm.ContentType, file: File): Self = withEntity(HttpEntity(contentType.asInstanceOf[ContentType], file)) + def withEntity(contentType: jm.ContentType, file: Path): Self = withEntity(HttpEntity.fromPath(contentType.asInstanceOf[ContentType], file)) import collection.JavaConverters._ /** Java API */ diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala index a6f95befde..f7d3a8944d 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala @@ -3,8 +3,8 @@ */ package akka.http.scaladsl.model - import java.io.File +import java.nio.file.Path import java.util.Optional import akka.http.impl.util.Util import scala.concurrent.duration.FiniteDuration @@ -344,8 +344,18 @@ object Multipart { * To create an instance with several parts or for multiple files, use * `FormData(BodyPart.fromFile("field1", ...), BodyPart.fromFile("field2", ...)` */ + @deprecated("Use `fromPath` instead", "2.4.5") def fromFile(name: String, contentType: ContentType, file: File, chunkSize: Int = -1): Multipart.FormData = - Multipart.FormData(Source.single(Multipart.FormData.BodyPart.fromFile(name, contentType, file, chunkSize))) + fromPath(name, contentType, file.toPath, chunkSize) + + /** + * Creates a FormData instance that contains a single part backed by the given file. + * + * To create an instance with several parts or for multiple files, use + * `FormData(BodyPart.fromPath("field1", ...), BodyPart.fromPath("field2", ...)` + */ + def fromPath(name: String, contentType: ContentType, file: Path, chunkSize: Int = -1): Multipart.FormData = + Multipart.FormData(Source.single(Multipart.FormData.BodyPart.fromPath(name, contentType, file, chunkSize))) /** * Strict [[FormData]]. @@ -432,8 +442,15 @@ object Multipart { /** * Creates a BodyPart backed by a File that will be streamed using a FileSource. */ + @deprecated("Use `fromPath` instead", since = "2.4.5") def fromFile(name: String, contentType: ContentType, file: File, chunkSize: Int = -1): BodyPart = - BodyPart(name, HttpEntity(contentType, file, chunkSize), Map("filename" -> file.getName)) + fromPath(name, contentType, file.toPath, chunkSize) + + /** + * Creates a BodyPart backed by a file that will be streamed using a FileSource. + */ + def fromPath(name: String, contentType: ContentType, file: Path, chunkSize: Int = -1): BodyPart = + BodyPart(name, HttpEntity.fromPath(contentType, file, chunkSize), Map("filename" -> file.getFileName.toString)) def unapply(value: BodyPart): Option[(String, BodyPartEntity, Map[String, String], immutable.Seq[HttpHeader])] = Some((value.name, value.entity, value.additionalDispositionParams, value.additionalHeaders)) diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala index 265b693af6..9cb20ba183 100644 --- a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala +++ b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FilePublisherTest.scala @@ -3,11 +3,10 @@ */ package akka.stream.tck -import java.io.{ File, FileWriter } +import java.nio.file.Files import akka.actor.ActorSystem import akka.event.Logging -import akka.stream.scaladsl.FileIO -import akka.stream.scaladsl.{ Sink } +import akka.stream.scaladsl.{ Sink, FileIO } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.testkit.{ EventFilter, TestEvent } @@ -28,21 +27,22 @@ class FilePublisherTest extends AkkaPublisherVerification[ByteString] { } val file = { - val f = File.createTempFile("file-source-tck", ".tmp") + val f = Files.createTempFile("file-source-tck", ".tmp") val chunk = "x" * ChunkSize - val fw = new FileWriter(f) + + val fw = Files.newBufferedWriter(f) for (i ← 1 to Elements) fw.append(chunk) fw.close() f } def createPublisher(elements: Long): Publisher[ByteString] = - FileIO.fromFile(file, chunkSize = 512) + FileIO.fromPath(file, chunkSize = 512) .take(elements) .runWith(Sink.asPublisher(false)) @AfterClass - def after = file.delete() + def after() = Files.delete(file) override def maxElementsFromPublisher(): Long = Elements } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 823d0f5fdf..7ac81dd9d5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -3,8 +3,7 @@ */ package akka.stream.io -import java.io.File -import java.nio.file.StandardOpenOption +import java.nio.file.{ Files, Path, StandardOpenOption } import akka.actor.ActorSystem import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor @@ -15,8 +14,7 @@ import akka.stream.testkit.Utils._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.ActorAttributes -import akka.util.ByteString -import akka.util.Timeout +import akka.util.{ ByteString, Timeout } import scala.collection.mutable.ListBuffer import scala.concurrent.Await import scala.concurrent.duration._ @@ -40,11 +38,11 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val TestByteStrings = TestLines.map(ByteString(_)) - "SynchronousFile Sink" must { + "FileSink" must { "write lines to a file" in assertAllStagesStopped { targetFile { f ⇒ val completion = Source(TestByteStrings) - .runWith(FileIO.toFile(f)) + .runWith(FileIO.toPath(f)) val result = Await.result(completion, 3.seconds) result.count should equal(6006) @@ -55,7 +53,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "create new file if not exists" in assertAllStagesStopped { targetFile({ f ⇒ val completion = Source(TestByteStrings) - .runWith(FileIO.toFile(f)) + .runWith(FileIO.toPath(f)) val result = Await.result(completion, 3.seconds) result.count should equal(6006) @@ -68,7 +66,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { def write(lines: List[String]) = Source(lines) .map(ByteString(_)) - .runWith(FileIO.toFile(f)) + .runWith(FileIO.toPath(f)) val completion1 = write(TestLines) Await.result(completion1, 3.seconds) @@ -87,7 +85,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { def write(lines: List[String] = TestLines) = Source(lines) .map(ByteString(_)) - .runWith(FileIO.toFile(f, Set(StandardOpenOption.APPEND))) + .runWith(FileIO.toPath(f, Set(StandardOpenOption.APPEND))) val completion1 = write() val result1 = Await.result(completion1, 3.seconds) @@ -96,8 +94,8 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val completion2 = write(lastWrite) val result2 = Await.result(completion2, 3.seconds) - f.length() should ===(result1.count + result2.count) - checkFileContents(f, TestLines.mkString("") + lastWrite.mkString("") + "\n") + Files.size(f) should ===(result1.count + result2.count) + checkFileContents(f, TestLines.mkString("") + lastWrite.mkString("")) } } @@ -106,7 +104,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) try { - Source.fromIterator(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(FileIO.toFile(f))(materializer) + Source.fromIterator(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(FileIO.toPath(f))(materializer) materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get @@ -125,7 +123,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { try { Source.fromIterator(() ⇒ Iterator.continually(TestByteStrings.head)) - .to(FileIO.toFile(f)) + .to(FileIO.toPath(f)) .withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")) .run()(materializer) @@ -138,17 +136,15 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } - private def targetFile(block: File ⇒ Unit, create: Boolean = true) { - val targetFile = File.createTempFile("synchronous-file-sink", ".tmp") - if (!create) targetFile.delete() - try block(targetFile) finally targetFile.delete() + private def targetFile(block: Path ⇒ Unit, create: Boolean = true) { + val targetFile = Files.createTempFile("synchronous-file-sink", ".tmp") + if (!create) Files.delete(targetFile) + try block(targetFile) finally Files.delete(targetFile) } - def checkFileContents(f: File, contents: String): Unit = { - val s = scala.io.Source.fromFile(f) - val out = s.getLines().mkString("\n") + "\n" - s.close() - out should ===(contents) + def checkFileContents(f: Path, contents: String): Unit = { + val out = Files.readAllBytes(f) + new String(out) should ===(contents) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index efe4f9823f..1311dd3ff7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -3,7 +3,8 @@ */ package akka.stream.io -import java.io.File +import java.nio.file.Files +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Random import akka.actor.ActorSystem import akka.stream.ActorMaterializer @@ -23,9 +24,6 @@ import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ import akka.testkit.AkkaSpec -import java.io.OutputStreamWriter -import java.io.FileOutputStream -import java.nio.charset.StandardCharsets.UTF_8 object FileSourceSpec { final case class Settings(chunkSize: Int, readAhead: Int) @@ -46,23 +44,23 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } val testFile = { - val f = File.createTempFile("file-source-spec", ".tmp") - new OutputStreamWriter(new FileOutputStream(f), UTF_8).append(TestText).close() + val f = Files.createTempFile("file-source-spec", ".tmp") + Files.newBufferedWriter(f, UTF_8).append(TestText).close() f } val notExistingFile = { // this way we make sure it doesn't accidentally exist - val f = File.createTempFile("not-existing-file", ".tmp") - f.delete() + val f = Files.createTempFile("not-existing-file", ".tmp") + Files.delete(f) f } val LinesCount = 2000 + new Random().nextInt(300) val manyLines = { - val f = File.createTempFile(s"file-source-spec-lines_$LinesCount", "tmp") - val w = new OutputStreamWriter(new FileOutputStream(f), UTF_8) + val f = Files.createTempFile(s"file-source-spec-lines_$LinesCount", "tmp") + val w = Files.newBufferedWriter(f, UTF_8) (1 to LinesCount).foreach { l ⇒ w.append("a" * l).append("\n") } @@ -70,12 +68,12 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { f } - "File Source" must { + "FileSource" must { "read contents from a file" in assertAllStagesStopped { val chunkSize = 512 val bufferAttributes = Attributes.inputBuffer(1, 2) - val p = FileIO.fromFile(testFile, chunkSize) + val p = FileIO.fromPath(testFile, chunkSize) .withAttributes(bufferAttributes) .runWith(Sink.asPublisher(false)) val c = TestSubscriber.manualProbe[ByteString]() @@ -112,7 +110,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val demandAllButOneChunks = TestText.length / chunkSize - 1 - val p = FileIO.fromFile(testFile, chunkSize) + val p = FileIO.fromPath(testFile, chunkSize) .withAttributes(bufferAttributes) .runWith(Sink.asPublisher(false)) @@ -141,7 +139,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "onError whent trying to read from file which does not exist" in assertAllStagesStopped { - val p = FileIO.fromFile(notExistingFile).runWith(Sink.asPublisher(false)) + val p = FileIO.fromPath(notExistingFile).runWith(Sink.asPublisher(false)) val c = TestSubscriber.manualProbe[ByteString]() p.subscribe(c) @@ -157,7 +155,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { import settings._ s"count lines in real file (chunkSize = $chunkSize, readAhead = $readAhead)" in { - val s = FileIO.fromFile(manyLines, chunkSize = chunkSize) + val s = FileIO.fromPath(manyLines, chunkSize = chunkSize) .withAttributes(Attributes.inputBuffer(readAhead, readAhead)) val f = s.runWith(Sink.fold(0) { case (acc, l) ⇒ acc + l.utf8String.count(_ == '\n') }) @@ -171,7 +169,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) try { - val p = FileIO.fromFile(manyLines).runWith(TestSink.probe)(materializer) + val p = FileIO.fromPath(manyLines).runWith(TestSink.probe)(materializer) materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get @@ -187,7 +185,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val timeout = Timeout(500.millis) try { - val p = FileIO.fromFile(manyLines) + val p = FileIO.fromPath(manyLines) .withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")) .runWith(TestSink.probe)(materializer) @@ -198,7 +196,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } "not signal onComplete more than once" in { - FileIO.fromFile(testFile, 2 * TestText.length) + FileIO.fromPath(testFile, 2 * TestText.length) .runWith(TestSink.probe) .requestNext(ByteString(TestText, UTF_8.name)) .expectComplete() @@ -207,8 +205,8 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } override def afterTermination(): Unit = { - testFile.delete() - manyLines.delete() + Files.delete(testFile) + Files.delete(manyLines) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index 0f020a1fc2..b225ef0834 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -3,9 +3,9 @@ */ package akka.stream.impl.io -import java.io.File import java.nio.ByteBuffer import java.nio.channels.FileChannel +import java.nio.file.Path import akka.Done import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } @@ -20,7 +20,7 @@ import scala.util.control.NonFatal /** INTERNAL API */ private[akka] object FilePublisher { - def props(f: File, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = { + def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = { require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") @@ -35,7 +35,7 @@ private[akka] object FilePublisher { } /** INTERNAL API */ -private[akka] final class FilePublisher(f: File, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) +private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { import FilePublisher._ @@ -49,7 +49,7 @@ private[akka] final class FilePublisher(f: File, completionPromise: Promise[IORe override def preStart() = { try { - chan = FileChannel.open(f.toPath, FilePublisher.Read) + chan = FileChannel.open(f, FilePublisher.Read) } catch { case ex: Exception ⇒ onErrorThenStop(ex) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 98c77f422f..281dde342c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -3,9 +3,8 @@ */ package akka.stream.impl.io -import java.io.File import java.nio.channels.FileChannel -import java.nio.file.StandardOpenOption +import java.nio.file.{ Path, StandardOpenOption } import akka.Done import akka.actor.{ Deploy, ActorLogging, Props } @@ -19,14 +18,14 @@ import scala.util.{ Failure, Success } /** INTERNAL API */ private[akka] object FileSubscriber { - def props(f: File, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) = { + def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) = { require(bufSize > 0, "buffer size must be > 0") Props(classOf[FileSubscriber], f, completionPromise, bufSize, openOptions).withDeploy(Deploy.local) } } /** INTERNAL API */ -private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) +private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) extends akka.stream.actor.ActorSubscriber with ActorLogging { @@ -37,7 +36,7 @@ private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult] private var bytesWritten: Long = 0 override def preStart(): Unit = try { - chan = FileChannel.open(f.toPath, openOptions.asJava) + chan = FileChannel.open(f, openOptions.asJava) super.preStart() } catch { @@ -57,7 +56,7 @@ private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult] } case ActorSubscriberMessage.OnError(ex) ⇒ - log.error(ex, "Tearing down FileSink({}) due to upstream error", f.getAbsolutePath) + log.error(ex, "Tearing down FileSink({}) due to upstream error", f) closeAndComplete(IOResult(bytesWritten, Failure(ex))) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index ebe6d55500..d0f8db5a46 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -3,8 +3,8 @@ */ package akka.stream.impl.io -import java.io.{ File, OutputStream } -import java.nio.file.StandardOpenOption +import java.io.OutputStream +import java.nio.file.{ Path, StandardOpenOption } import akka.stream.IOResult import akka.stream.impl.SinkModule import akka.stream.impl.StreamLayout.Module @@ -16,10 +16,10 @@ import scala.concurrent.{ Future, Promise } /** * INTERNAL API - * Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file + * Creates simple synchronous Sink which writes all incoming elements to the given file * (creating it before hand if necessary). */ -private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) +private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) extends SinkModule[ByteString, Future[IOResult]](shape) { override protected def label: String = s"FileSink($f, $options)" @@ -45,8 +45,7 @@ private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], va /** * INTERNAL API - * Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file - * (creating it before hand if necessary). + * Creates simple synchronous Sink which writes all incoming elements to the output stream. */ private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean) extends SinkModule[ByteString, Future[IOResult]](shape) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 2a080cdba9..7fec482d18 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -3,7 +3,8 @@ */ package akka.stream.impl.io -import java.io.{ File, InputStream } +import java.io.InputStream +import java.nio.file.Path import akka.stream._ import akka.stream.ActorAttributes.Dispatcher @@ -17,9 +18,9 @@ import scala.concurrent.{ Future, Promise } /** * INTERNAL API - * Creates simple synchronous (Java 6 compatible) Source backed by the given file. + * Creates simple synchronous Source backed by the given file. */ -private[akka] final class FileSource(f: File, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) +private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) extends SourceModule[ByteString, Future[IOResult]](shape) { require(chunkSize > 0, "chunkSize must be greater than 0") override def create(context: MaterializationContext) = { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index 359f7cdd7f..0b452c4bfc 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -4,7 +4,7 @@ package akka.stream.javadsl import java.io.File -import java.nio.file.StandardOpenOption +import java.nio.file.{ Path, StandardOpenOption } import java.util import akka.stream.{ scaladsl, javadsl } import akka.stream.IOResult @@ -20,7 +20,7 @@ object FileIO { /** * Creates a Sink that writes incoming [[ByteString]] elements to the given file. - * Overwrites existing files, if you want to append to an existing file use [[#file(File, util.Set[StandardOpenOption])]]. + * Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[StandardOpenOption])]]. * * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -30,11 +30,26 @@ object FileIO { * * @param f The file to write to */ - def toFile(f: File): javadsl.Sink[ByteString, CompletionStage[IOResult]] = - new Sink(scaladsl.FileIO.toFile(f).toCompletionStage()) + @deprecated("Use `toPath` instead.", "2.4.5") + def toFile(f: File): javadsl.Sink[ByteString, CompletionStage[IOResult]] = toPath(f.toPath) /** - * Creates a Sink that writes incoming [[ByteString]] elements to the given file + * Creates a Sink that writes incoming [[ByteString]] elements to the given file. + * Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[StandardOpenOption])]]. + * + * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param f The file to write to + */ + def toPath(f: Path): javadsl.Sink[ByteString, CompletionStage[IOResult]] = + new Sink(scaladsl.FileIO.toPath(f).toCompletionStage()) + + /** + * Creates a Sink that writes incoming [[ByteString]] elements to the given file. * * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -45,11 +60,27 @@ object FileIO { * @param f The file to write to * @param options File open options */ + @deprecated("Use `toPath` instead.", "2.4.5") def toFile(f: File, options: util.Set[StandardOpenOption]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = - new Sink(scaladsl.FileIO.toFile(f, options.asScala.toSet).toCompletionStage()) + toPath(f.toPath) /** - * Creates a Source from a Files contents. + * Creates a Sink that writes incoming [[ByteString]] elements to the given file. + * + * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param f The file to write to + * @param options File open options + */ + def toPath(f: Path, options: util.Set[StandardOpenOption]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = + new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet).toCompletionStage()) + + /** + * Creates a Source from a files contents. * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, * except the last element, which will be up to 8192 in size. * @@ -59,10 +90,24 @@ object FileIO { * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. */ - def fromFile(f: File): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromFile(f, 8192) + @deprecated("Use `fromPath` instead.", "2.4.5") + def fromFile(f: File): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromPath(f.toPath) /** - * Creates a synchronous (Java 6 compatible) Source from a Files contents. + * Creates a Source from a files contents. + * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, + * except the last element, which will be up to 8192 in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. + */ + def fromPath(f: Path): javadsl.Source[ByteString, CompletionStage[IOResult]] = fromPath(f, 8192) + + /** + * Creates a synchronous Source from a files contents. * Emitted elements are `chunkSize` sized [[ByteString]] elements, * except the last element, which will be up to `chunkSize` in size. * @@ -72,7 +117,21 @@ object FileIO { * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. */ + @deprecated("Use `fromPath` instead.", "2.4.5") def fromFile(f: File, chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] = - new Source(scaladsl.FileIO.fromFile(f, chunkSize).toCompletionStage()) + fromPath(f.toPath, chunkSize) + /** + * Creates a synchronous Source from a files contents. + * Emitted elements are `chunkSize` sized [[ByteString]] elements, + * except the last element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. + */ + def fromPath(f: Path, chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] = + new Source(scaladsl.FileIO.fromPath(f, chunkSize).toCompletionStage()) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index 3428ea575f..69942f543d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -4,7 +4,7 @@ package akka.stream.scaladsl import java.io.File -import java.nio.file.StandardOpenOption +import java.nio.file.{ Path, StandardOpenOption } import java.nio.file.StandardOpenOption._ import akka.stream.impl.Stages.DefaultAttributes @@ -23,7 +23,7 @@ object FileIO { import Source.{ shape ⇒ sourceShape } /** - * Creates a Source from a Files contents. + * Creates a Source from a files contents. * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, * except the final element, which will be up to `chunkSize` in size. * @@ -33,10 +33,28 @@ object FileIO { * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. * - * @param f the File to read from + * @param f the file to read from * @param chunkSize the size of each read operation, defaults to 8192 */ + @deprecated("Use `fromPath` instead", "2.4.5") def fromFile(f: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = + fromPath(f.toPath, chunkSize) + + /** + * Creates a Source from a files contents. + * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, + * except the final element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. + * + * @param f the file to read from + * @param chunkSize the size of each read operation, defaults to 8192 + */ + def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, sourceShape("FileSource"))) /** @@ -48,9 +66,25 @@ object FileIO { * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, * unless configured otherwise by using [[ActorAttributes]]. * - * @param f the File to write to + * @param f the file to write to * @param options File open options, defaults to Set(WRITE, CREATE) */ + @deprecated("Use `toPath` instead", "2.4.5") def toFile(f: File, options: Set[StandardOpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = + toPath(f.toPath, options) + + /** + * Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files by default. + * + * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, + * unless configured otherwise by using [[ActorAttributes]]. + * + * @param f the file to write to + * @param options File open options, defaults to Set(WRITE, CREATE) + */ + def toPath(f: Path, options: Set[StandardOpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = new Sink(new FileSink(f, options, DefaultAttributes.fileSink, sinkShape("FileSink"))) } diff --git a/project/MiMa.scala b/project/MiMa.scala index f948d50454..1222759532 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -746,7 +746,11 @@ object MiMa extends AutoPlugin { "2.4.4" -> Seq( // #20342 HttpEntity scaladsl overrides ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.withoutSizeLimit"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.withSizeLimit") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpEntity.withSizeLimit"), + + // #20293 Use JDK7 NIO Path instead of File + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.HttpMessage#MessageTransformations.withEntity"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.HttpMessage.withEntity") ) ) }