diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 3ca7cd9780..c527f51fb6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -3,6 +3,7 @@ */ package akka.stream.io +import java.nio.file.StandardOpenOption.{ CREATE, WRITE } import java.nio.file.{ Files, Path, StandardOpenOption } import akka.actor.ActorSystem @@ -102,6 +103,43 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { } } + "allow writing from specific position to the file" in assertAllStagesStopped { + targetFile { f ⇒ + val TestLinesCommon = { + val b = ListBuffer[String]() + b.append("a" * 1000 + "\n") + b.append("b" * 1000 + "\n") + b.append("c" * 1000 + "\n") + b.append("d" * 1000 + "\n") + b.toList + } + + val commonByteString = TestLinesCommon.map(ByteString(_)).foldLeft[ByteString](ByteString.empty)((acc, line) ⇒ acc ++ line).compact + val startPosition = commonByteString.size + + val testLinesPart2: List[String] = { + val b = ListBuffer[String]() + b.append("x" * 1000 + "\n") + b.append("x" * 1000 + "\n") + b.toList + } + + def write(lines: List[String] = TestLines, startPosition: Long = 0) = + Source(lines) + .map(ByteString(_)) + .runWith(FileIO.toPath(f, options = Set(WRITE, CREATE), startPosition = startPosition)) + + val completion1 = write() + val result1 = Await.result(completion1, 3.seconds) + + val completion2 = write(testLinesPart2, startPosition) + val result2 = Await.result(completion2, 3.seconds) + + Files.size(f) should ===(startPosition + result2.count) + checkFileContents(f, TestLinesCommon.mkString("") + testLinesPart2.mkString("")) + } + } + "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { targetFile { f ⇒ val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index 607deff6a0..e392051a31 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -108,6 +108,34 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { c.expectComplete() } + "read partial contents from a file" in assertAllStagesStopped { + val chunkSize = 512 + val startPosition = 1000 + val bufferAttributes = Attributes.inputBuffer(1, 2) + + val p = FileIO.fromPath(testFile, chunkSize, startPosition) + .withAttributes(bufferAttributes) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[ByteString]() + p.subscribe(c) + val sub = c.expectSubscription() + + var remaining = TestText.drop(1000) + def nextChunk() = { + val (chunk, rest) = remaining.splitAt(chunkSize) + remaining = rest + chunk + } + + sub.request(5000) + + for (_ ← 1 to 10) { + c.expectNext().utf8String should ===(nextChunk().toString) + } + + c.expectComplete() + } + "complete only when all contents of a file have been signalled" in assertAllStagesStopped { val chunkSize = 256 val bufferAttributes = Attributes.inputBuffer(4, 8) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index 572d0c6e4b..ae4e82a79e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -21,12 +21,13 @@ import scala.util.control.NonFatal /** INTERNAL API */ @InternalApi private[akka] object FilePublisher { - def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = { + def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, startPosition: Long, initialBuffer: Int, maxBuffer: Int) = { require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") + require(startPosition >= 0, s"startPosition must be >= 0 (was $startPosition)") require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)") - Props(classOf[FilePublisher], f, completionPromise, chunkSize, initialBuffer, maxBuffer) + Props(classOf[FilePublisher], f, completionPromise, chunkSize, startPosition, initialBuffer, maxBuffer) .withDeploy(Deploy.local) } @@ -36,7 +37,7 @@ import scala.util.control.NonFatal } /** INTERNAL API */ -@InternalApi private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) +@InternalApi private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, startPosition: Long, initialBuffer: Int, maxBuffer: Int) extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { import FilePublisher._ @@ -51,6 +52,9 @@ import scala.util.control.NonFatal override def preStart() = { try { chan = FileChannel.open(f, FilePublisher.Read) + if (startPosition > 0) { + chan.position(startPosition) + } } catch { case NonFatal(ex) ⇒ completionPromise.trySuccess(IOResult(0L, Failure(ex))) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 53d17fca36..6d71850179 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -4,7 +4,7 @@ package akka.stream.impl.io import java.nio.channels.FileChannel -import java.nio.file.{ OpenOption, Path, StandardOpenOption } +import java.nio.file.{ Path, OpenOption } import akka.Done import akka.actor.{ ActorLogging, Deploy, Props } @@ -19,14 +19,15 @@ import scala.util.{ Failure, Success } /** INTERNAL API */ @InternalApi private[akka] object FileSubscriber { - def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[OpenOption]) = { + def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, startPosition: Long, openOptions: Set[OpenOption]) = { require(bufSize > 0, "buffer size must be > 0") - Props(classOf[FileSubscriber], f, completionPromise, bufSize, openOptions).withDeploy(Deploy.local) + require(startPosition >= 0, s"startPosition must be >= 0 (was $startPosition)") + Props(classOf[FileSubscriber], f, completionPromise, bufSize, startPosition, openOptions).withDeploy(Deploy.local) } } /** INTERNAL API */ -@InternalApi private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) +@InternalApi private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, startPosition: Long, openOptions: Set[OpenOption]) extends akka.stream.actor.ActorSubscriber with ActorLogging { @@ -38,6 +39,9 @@ import scala.util.{ Failure, Success } override def preStart(): Unit = try { chan = FileChannel.open(f, openOptions.asJava) + if (startPosition > 0) { + chan.position(startPosition) + } super.preStart() } catch { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index 3ba9ca61b7..e5887f76b2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -4,7 +4,7 @@ package akka.stream.impl.io import java.io.OutputStream -import java.nio.file.{ OpenOption, Path, StandardOpenOption } +import java.nio.file.{ Path, OpenOption } import akka.annotation.InternalApi import akka.stream._ @@ -21,7 +21,7 @@ import scala.concurrent.{ Future, Promise } * Creates simple synchronous Sink which writes all incoming elements to the given file * (creating it before hand if necessary). */ -@InternalApi private[akka] final class FileSink(f: Path, options: immutable.Set[OpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) +@InternalApi private[akka] final class FileSink(f: Path, startPosition: Long, options: immutable.Set[OpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) extends SinkModule[ByteString, Future[IOResult]](shape) { override protected def label: String = s"FileSink($f, $options)" @@ -31,7 +31,7 @@ import scala.concurrent.{ Future, Promise } val settings = materializer.effectiveSettings(context.effectiveAttributes) val ioResultPromise = Promise[IOResult]() - val props = FileSubscriber.props(f, ioResultPromise, settings.maxInputBufferSize, options) + val props = FileSubscriber.props(f, ioResultPromise, settings.maxInputBufferSize, startPosition, options) val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) @@ -39,10 +39,10 @@ import scala.concurrent.{ Future, Promise } } override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] = - new FileSink(f, options, attributes, shape) + new FileSink(f, startPosition, options, attributes, shape) override def withAttributes(attr: Attributes): SinkModule[ByteString, Future[IOResult]] = - new FileSink(f, options, attr, amendShape(attr)) + new FileSink(f, startPosition, options, attr, amendShape(attr)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 342330095f..fbc4427e22 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -21,16 +21,17 @@ import scala.concurrent.{ Future, Promise } * INTERNAL API * Creates simple synchronous Source backed by the given file. */ -@InternalApi private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) +@InternalApi private[akka] final class FileSource(f: Path, chunkSize: Int, startPosition: Long, val attributes: Attributes, shape: SourceShape[ByteString]) extends SourceModule[ByteString, Future[IOResult]](shape) { require(chunkSize > 0, "chunkSize must be greater than 0") + require(startPosition >= 0, "startPosition must be equal or greater than 0") override def create(context: MaterializationContext) = { // FIXME rewrite to be based on GraphStage rather than dangerous downcasts val materializer = ActorMaterializerHelper.downcast(context.materializer) val settings = materializer.effectiveSettings(context.effectiveAttributes) val ioResultPromise = Promise[IOResult]() - val props = FilePublisher.props(f, ioResultPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize) + val props = FilePublisher.props(f, ioResultPromise, chunkSize, startPosition, settings.initialInputBufferSize, settings.maxInputBufferSize) val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) @@ -39,10 +40,10 @@ import scala.concurrent.{ Future, Promise } } override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[IOResult]] = - new FileSource(f, chunkSize, attributes, shape) + new FileSource(f, chunkSize, startPosition, attributes, shape) override def withAttributes(attr: Attributes): SourceModule[ByteString, Future[IOResult]] = - new FileSource(f, chunkSize, attr, amendShape(attr)) + new FileSource(f, chunkSize, startPosition, attr, amendShape(attr)) override protected def label: String = s"FileSource($f, $chunkSize)" } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index 25971dc2c8..4e0fd2bba8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -20,7 +20,7 @@ object FileIO { /** * Creates a Sink that writes incoming [[ByteString]] elements to the given file. - * Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[StandardOpenOption])]]. + * Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[OpenOption])]]. * * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -35,7 +35,7 @@ object FileIO { /** * Creates a Sink that writes incoming [[ByteString]] elements to the given file path. - * Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[StandardOpenOption])]]. + * Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[OpenOption])]]. * * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, * and a possible exception if IO operation was not completed successfully. @@ -79,6 +79,22 @@ object FileIO { def toPath[Opt <: OpenOption](f: Path, options: util.Set[Opt]): javadsl.Sink[ByteString, CompletionStage[IOResult]] = new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet).toCompletionStage()) + /** + * Creates a Sink that writes incoming [[ByteString]] elements to the given file path. + * + * Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param f The file path to write to + * @param options File open options + * @param startPosition startPosition the start position to read from, defaults to 0 + */ + def toPath[Opt <: OpenOption](f: Path, options: util.Set[Opt], startPosition: Long): javadsl.Sink[ByteString, CompletionStage[IOResult]] = + new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet, startPosition).toCompletionStage()) + /** * Creates a Source from a files contents. * Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes, @@ -143,4 +159,22 @@ object FileIO { */ def fromPath(f: Path, chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] = new Source(scaladsl.FileIO.fromPath(f, chunkSize).toCompletionStage()) + + /** + * Creates a synchronous Source from a files contents. + * Emitted elements are `chunkSize` sized [[ByteString]] elements, + * except the last element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. + * + * @param f the file path to read from + * @param chunkSize the size of each read operation + * @param startPosition startPosition the start position to read from, defaults to 0 + */ + def fromPath(f: Path, chunkSize: Int, startPosition: Long): javadsl.Source[ByteString, CompletionStage[IOResult]] = + new Source(scaladsl.FileIO.fromPath(f, chunkSize, startPosition).toCompletionStage()) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index 73430d2194..81b6fbe0b7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -55,7 +55,25 @@ object FileIO { * @param chunkSize the size of each read operation, defaults to 8192 */ def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = - Source.fromGraph(new FileSource(f, chunkSize, DefaultAttributes.fileSource, sourceShape("FileSource"))) + fromPath(f, chunkSize, startPosition = 0) + + /** + * Creates a Source from a files contents. + * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, + * except the final element, which will be up to `chunkSize` in size. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. + * + * @param f the file path to read from + * @param chunkSize the size of each read operation, defaults to 8192 + * @param startPosition the start position to read from + */ + def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] = + Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource"))) /** * Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files by default. @@ -86,5 +104,21 @@ object FileIO { * @param options File open options, defaults to Set(WRITE, CREATE) */ def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] = - Sink.fromGraph(new FileSink(f, options, DefaultAttributes.fileSink, sinkShape("FileSink"))) + toPath(f, options, startPosition = 0) + + /** + * Creates a Sink which writes incoming [[ByteString]] elements to the given file path. Overwrites existing files by default. + * + * Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion, + * and a possible exception if IO operation was not completed successfully. + * + * This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`, + * unless configured otherwise by using [[ActorAttributes]]. + * + * @param f the file path to write to + * @param options File open options, defaults to Set(WRITE, CREATE) + * @param startPosition the start position to write to + */ + def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] = + Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink"))) } diff --git a/project/MiMa.scala b/project/MiMa.scala index ec87eec54d..f272134a6c 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1118,6 +1118,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.DaemonMsgCreateSerializer.serialization"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.DaemonMsgCreateSerializer.this"), + // #22657 changes to internal classes + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FilePublisher.props"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FilePublisher.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSink.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSource.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSubscriber.props"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSubscriber.this"), + // Internal MessageBuffer for actors ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.PerGroupingBuffer.akka$cluster$pubsub$PerGroupingBuffer$$buffers"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.PerGroupingBuffer.akka$cluster$pubsub$PerGroupingBuffer$_setter_$akka$cluster$pubsub$PerGroupingBuffer$$buffers_="),