diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 620967cce8..53d17fca36 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -4,7 +4,7 @@ package akka.stream.impl.io import java.nio.channels.FileChannel -import java.nio.file.{ Path, StandardOpenOption } +import java.nio.file.{ OpenOption, Path, StandardOpenOption } import akka.Done import akka.actor.{ ActorLogging, Deploy, Props } @@ -19,7 +19,7 @@ import scala.util.{ Failure, Success } /** INTERNAL API */ @InternalApi private[akka] object FileSubscriber { - def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) = { + def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[OpenOption]) = { require(bufSize > 0, "buffer size must be > 0") Props(classOf[FileSubscriber], f, completionPromise, bufSize, openOptions).withDeploy(Deploy.local) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index c1ace9c4d6..3ba9ca61b7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -4,7 +4,7 @@ package akka.stream.impl.io import java.io.OutputStream -import java.nio.file.{ Path, StandardOpenOption } +import java.nio.file.{ OpenOption, Path, StandardOpenOption } import akka.annotation.InternalApi import akka.stream._ @@ -13,6 +13,7 @@ import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.ActorAttributes.Dispatcher import akka.util.ByteString +import scala.collection.immutable import scala.concurrent.{ Future, Promise } /** @@ -20,7 +21,7 @@ import scala.concurrent.{ Future, Promise } * Creates simple synchronous Sink which writes all incoming elements to the given file * (creating it before hand if necessary). */ -@InternalApi private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) +@InternalApi private[akka] final class FileSink(f: Path, options: immutable.Set[OpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) extends SinkModule[ByteString, Future[IOResult]](shape) { override protected def label: String = s"FileSink($f, $options)" diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index 6183aea740..25971dc2c8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -4,13 +4,13 @@ package akka.stream.javadsl import java.io.File -import java.nio.file.{ Path, StandardOpenOption } +import java.nio.file.{ OpenOption, Path } import java.util -import akka.stream.{ scaladsl, javadsl } -import akka.stream.IOResult -import akka.util.ByteString import java.util.concurrent.CompletionStage +import akka.stream.{ IOResult, javadsl, scaladsl } +import akka.util.ByteString + import scala.collection.JavaConverters._ /** @@ -61,7 +61,7 @@ object FileIO { * @param options File open options */ @deprecated("Use `toPath` instead.", "2.4.5") - def toFile(f: File, options: util.Set[StandardOpenOption]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = + def toFile[Opt <: OpenOption](f: File, options: util.Set[Opt]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = toPath(f.toPath) /** @@ -76,7 +76,7 @@ object FileIO { * @param f The file path to write to * @param options File open options */ - def toPath(f: Path, options: util.Set[StandardOpenOption]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = + def toPath[Opt <: OpenOption](f: Path, options: util.Set[Opt]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet).toCompletionStage()) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index 1f0bab0615..73430d2194 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -4,7 +4,7 @@ package akka.stream.scaladsl import java.io.File -import java.nio.file.{ Path, StandardOpenOption } +import java.nio.file.{ OpenOption, Path, StandardOpenOption } import java.nio.file.StandardOpenOption._ import akka.stream.impl.Stages.DefaultAttributes @@ -28,7 +28,7 @@ object FileIO { * except the final element, which will be up to `chunkSize` in size. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. @@ -46,7 +46,7 @@ object FileIO { * except the final element, which will be up to `chunkSize` in size. * * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or - * set it for a given Source by using [[ActorAttributes]]. + * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, * and a possible exception if IO operation was not completed successfully. @@ -64,13 +64,13 @@ object FileIO { * and a possible exception if IO operation was not completed successfully. * * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, - * unless configured otherwise by using [[ActorAttributes]]. + * unless configured otherwise by using [[akka.stream.ActorAttributes]]. * * @param f the file to write to * @param options File open options, defaults to Set(WRITE, CREATE) */ @deprecated("Use `toPath` instead", "2.4.5") - def toFile(f: File, options: Set[StandardOpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = + def toFile(f: File, options: Set[OpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = toPath(f.toPath, options) /** @@ -80,11 +80,11 @@ object FileIO { * and a possible exception if IO operation was not completed successfully. * * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, - * unless configured otherwise by using [[ActorAttributes]]. + * unless configured otherwise by using [[akka.stream.ActorAttributes]]. * * @param f the file path to write to * @param options File open options, defaults to Set(WRITE, CREATE) */ - def toPath(f: Path, options: Set[StandardOpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = + def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = Sink.fromGraph(new FileSink(f, options, DefaultAttributes.fileSink, sinkShape("FileSink"))) }