diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index c527f51fb6..a6f57b977c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -65,7 +65,26 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { }, create = false) } - "by default write into existing file" in assertAllStagesStopped { + "write into existing file without wiping existing data" in assertAllStagesStopped { + targetFile { f ⇒ + def write(lines: List[String]) = + Source(lines) + .map(ByteString(_)) + .runWith(FileIO.toPath(f, Set(StandardOpenOption.WRITE, StandardOpenOption.CREATE))) + + val completion1 = write(TestLines) + Await.result(completion1, 3.seconds) + + val lastWrite = List("x" * 100) + val completion2 = write(lastWrite) + val result = Await.result(completion2, 3.seconds) + + result.count should ===(lastWrite.flatten.length) + checkFileContents(f, lastWrite.mkString("") + TestLines.mkString("").drop(100)) + } + } + + "by default replace the existing file" in assertAllStagesStopped { targetFile { f ⇒ def write(lines: List[String]) = Source(lines) @@ -80,7 +99,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { val result = Await.result(completion2, 3.seconds) result.count should ===(lastWrite.flatten.length) - checkFileContents(f, lastWrite.mkString("") + TestLines.mkString("").drop(100)) + checkFileContents(f, lastWrite.mkString("")) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index 4e0fd2bba8..ad2a1f70a0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -8,7 +8,7 @@ import java.nio.file.{ OpenOption, Path } import java.util import java.util.concurrent.CompletionStage -import akka.stream.{ IOResult, javadsl, scaladsl } +import akka.stream.{ ActorAttributes, IOResult, javadsl, scaladsl } import akka.util.ByteString import scala.collection.JavaConverters._ @@ -20,7 +20,8 @@ object FileIO { /** * Creates a Sink that writes incoming [[ByteString]] elements to the given file. - * Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[OpenOption])]]. + * Overwrites existing files by truncating their contents, if you want to append to an existing file use + * [[#toFile(File, util.Set[OpenOption])]] with [[java.nio.file.StandardOpenOption.APPEND]]. * * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -35,7 +36,8 @@ object FileIO { /** * Creates a Sink that writes incoming [[ByteString]] elements to the given file path. - * Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[OpenOption])]]. + * Overwrites existing files by truncating their contents, if you want to append to an existing file + * [[#toPath(Path, util.Set[OpenOption])]] with [[java.nio.file.StandardOpenOption.APPEND]]. * * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -58,7 +60,7 @@ object FileIO { * set it for a given Source by using [[ActorAttributes]]. * * @param f The file to write to - * @param options File open options + * @param options File open options, see [[java.nio.file.StandardOpenOption]] */ @deprecated("Use `toPath` instead.", "2.4.5") def toFile[Opt <: OpenOption](f: File, options: util.Set[Opt]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = @@ -74,7 +76,7 @@ object FileIO { * set it for a given Source by using [[ActorAttributes]]. * * @param f The file path to write to - * @param options File open options + * @param options File open options, see [[java.nio.file.StandardOpenOption]] */ def toPath[Opt <: OpenOption](f: Path, options: util.Set[Opt]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet).toCompletionStage()) @@ -89,7 +91,7 @@ object FileIO { * set it for a given Source by using [[ActorAttributes]]. * * @param f The file path to write to - * @param options File open options + * @param options File open options, see [[java.nio.file.StandardOpenOption]] * @param startPosition startPosition the start position to read from, defaults to 0 */ def toPath[Opt <: OpenOption](f: Path, options: util.Set[Opt], startPosition: Long): javadsl.Sink[ByteString, CompletionStage[IOResult]] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index 81b6fbe0b7..64b640f037 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -9,7 +9,7 @@ import java.nio.file.StandardOpenOption._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io._ -import akka.stream.IOResult +import akka.stream.{ ActorAttributes, IOResult } import akka.util.ByteString import scala.concurrent.Future @@ -76,7 +76,8 @@ object FileIO { Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource"))) /** - * Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files by default. + * Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files + * by truncating their contents as default. * * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -85,14 +86,15 @@ object FileIO { * unless configured otherwise by using [[akka.stream.ActorAttributes]]. * * @param f the file to write to - * @param options File open options, defaults to Set(WRITE, CREATE) + * @param options File open options, see [[java.nio.file.StandardOpenOption]], defaults to Set(WRITE, TRUNCATE_EXISTING, CREATE) */ @deprecated("Use `toPath` instead", "2.4.5") - def toFile(f: File, options: Set[OpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = + def toFile(f: File, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] = toPath(f.toPath, options) /** - * Creates a Sink which writes incoming [[ByteString]] elements to the given file path. Overwrites existing files by default. + * Creates a Sink which writes incoming [[ByteString]] elements to the given file path. Overwrites existing files + * by truncating their contents as default. * * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -101,13 +103,14 @@ object FileIO { * unless configured otherwise by using [[akka.stream.ActorAttributes]]. * * @param f the file path to write to - * @param options File open options, defaults to Set(WRITE, CREATE) + * @param options File open options, see [[java.nio.file.StandardOpenOption]], defaults to Set(WRITE, TRUNCATE_EXISTING, CREATE) */ - def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = + def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] = toPath(f, options, startPosition = 0) /** - * Creates a Sink which writes incoming [[ByteString]] elements to the given file path. Overwrites existing files by default. + * Creates a Sink which writes incoming [[ByteString]] elements to the given file path. Overwrites existing files + * by truncating their contents as default. * * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -116,7 +119,7 @@ object FileIO { * unless configured otherwise by using [[ActorAttributes]]. * * @param f the file path to write to - * @param options File open options, defaults to Set(WRITE, CREATE) + * @param options File open options, see [[java.nio.file.StandardOpenOption]], defaults to Set(WRITE, CREATE) * @param startPosition the start position to write to */ def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =