Add startPosition option for FileIO API
* New startPosition parameter to FileIO.toPath * New startPosition parameter to FileIO.fromPath * default startPosition value : 0 * fix binary compatibilities * Add the MiMa filters for the internal classes * Feedback from comments of the PR
This commit is contained in:
parent
082507973a
commit
33ce3f0b77
9 changed files with 171 additions and 20 deletions
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.stream.io
|
||||
|
||||
import java.nio.file.StandardOpenOption.{ CREATE, WRITE }
|
||||
import java.nio.file.{ Files, Path, StandardOpenOption }
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
|
|
@ -102,6 +103,43 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
|
|||
}
|
||||
}
|
||||
|
||||
"allow writing from specific position to the file" in assertAllStagesStopped {
|
||||
targetFile { f ⇒
|
||||
val TestLinesCommon = {
|
||||
val b = ListBuffer[String]()
|
||||
b.append("a" * 1000 + "\n")
|
||||
b.append("b" * 1000 + "\n")
|
||||
b.append("c" * 1000 + "\n")
|
||||
b.append("d" * 1000 + "\n")
|
||||
b.toList
|
||||
}
|
||||
|
||||
val commonByteString = TestLinesCommon.map(ByteString(_)).foldLeft[ByteString](ByteString.empty)((acc, line) ⇒ acc ++ line).compact
|
||||
val startPosition = commonByteString.size
|
||||
|
||||
val testLinesPart2: List[String] = {
|
||||
val b = ListBuffer[String]()
|
||||
b.append("x" * 1000 + "\n")
|
||||
b.append("x" * 1000 + "\n")
|
||||
b.toList
|
||||
}
|
||||
|
||||
def write(lines: List[String] = TestLines, startPosition: Long = 0) =
|
||||
Source(lines)
|
||||
.map(ByteString(_))
|
||||
.runWith(FileIO.toPath(f, options = Set(WRITE, CREATE), startPosition = startPosition))
|
||||
|
||||
val completion1 = write()
|
||||
val result1 = Await.result(completion1, 3.seconds)
|
||||
|
||||
val completion2 = write(testLinesPart2, startPosition)
|
||||
val result2 = Await.result(completion2, 3.seconds)
|
||||
|
||||
Files.size(f) should ===(startPosition + result2.count)
|
||||
checkFileContents(f, TestLinesCommon.mkString("") + testLinesPart2.mkString(""))
|
||||
}
|
||||
}
|
||||
|
||||
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
|
||||
targetFile { f ⇒
|
||||
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
|
||||
|
|
|
|||
|
|
@ -108,6 +108,34 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
|
|||
c.expectComplete()
|
||||
}
|
||||
|
||||
"read partial contents from a file" in assertAllStagesStopped {
|
||||
val chunkSize = 512
|
||||
val startPosition = 1000
|
||||
val bufferAttributes = Attributes.inputBuffer(1, 2)
|
||||
|
||||
val p = FileIO.fromPath(testFile, chunkSize, startPosition)
|
||||
.withAttributes(bufferAttributes)
|
||||
.runWith(Sink.asPublisher(false))
|
||||
val c = TestSubscriber.manualProbe[ByteString]()
|
||||
p.subscribe(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
||||
var remaining = TestText.drop(1000)
|
||||
def nextChunk() = {
|
||||
val (chunk, rest) = remaining.splitAt(chunkSize)
|
||||
remaining = rest
|
||||
chunk
|
||||
}
|
||||
|
||||
sub.request(5000)
|
||||
|
||||
for (_ ← 1 to 10) {
|
||||
c.expectNext().utf8String should ===(nextChunk().toString)
|
||||
}
|
||||
|
||||
c.expectComplete()
|
||||
}
|
||||
|
||||
"complete only when all contents of a file have been signalled" in assertAllStagesStopped {
|
||||
val chunkSize = 256
|
||||
val bufferAttributes = Attributes.inputBuffer(4, 8)
|
||||
|
|
|
|||
|
|
@ -21,12 +21,13 @@ import scala.util.control.NonFatal
|
|||
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] object FilePublisher {
|
||||
def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = {
|
||||
def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, startPosition: Long, initialBuffer: Int, maxBuffer: Int) = {
|
||||
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
|
||||
require(startPosition >= 0, s"startPosition must be >= 0 (was $startPosition)")
|
||||
require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)")
|
||||
require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)")
|
||||
|
||||
Props(classOf[FilePublisher], f, completionPromise, chunkSize, initialBuffer, maxBuffer)
|
||||
Props(classOf[FilePublisher], f, completionPromise, chunkSize, startPosition, initialBuffer, maxBuffer)
|
||||
.withDeploy(Deploy.local)
|
||||
}
|
||||
|
||||
|
|
@ -36,7 +37,7 @@ import scala.util.control.NonFatal
|
|||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int)
|
||||
@InternalApi private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, startPosition: Long, initialBuffer: Int, maxBuffer: Int)
|
||||
extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging {
|
||||
import FilePublisher._
|
||||
|
||||
|
|
@ -51,6 +52,9 @@ import scala.util.control.NonFatal
|
|||
override def preStart() = {
|
||||
try {
|
||||
chan = FileChannel.open(f, FilePublisher.Read)
|
||||
if (startPosition > 0) {
|
||||
chan.position(startPosition)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
completionPromise.trySuccess(IOResult(0L, Failure(ex)))
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.stream.impl.io
|
||||
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.file.{ OpenOption, Path, StandardOpenOption }
|
||||
import java.nio.file.{ Path, OpenOption }
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.{ ActorLogging, Deploy, Props }
|
||||
|
|
@ -19,14 +19,15 @@ import scala.util.{ Failure, Success }
|
|||
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] object FileSubscriber {
|
||||
def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[OpenOption]) = {
|
||||
def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, startPosition: Long, openOptions: Set[OpenOption]) = {
|
||||
require(bufSize > 0, "buffer size must be > 0")
|
||||
Props(classOf[FileSubscriber], f, completionPromise, bufSize, openOptions).withDeploy(Deploy.local)
|
||||
require(startPosition >= 0, s"startPosition must be >= 0 (was $startPosition)")
|
||||
Props(classOf[FileSubscriber], f, completionPromise, bufSize, startPosition, openOptions).withDeploy(Deploy.local)
|
||||
}
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption])
|
||||
@InternalApi private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, startPosition: Long, openOptions: Set[OpenOption])
|
||||
extends akka.stream.actor.ActorSubscriber
|
||||
with ActorLogging {
|
||||
|
||||
|
|
@ -38,6 +39,9 @@ import scala.util.{ Failure, Success }
|
|||
|
||||
override def preStart(): Unit = try {
|
||||
chan = FileChannel.open(f, openOptions.asJava)
|
||||
if (startPosition > 0) {
|
||||
chan.position(startPosition)
|
||||
}
|
||||
|
||||
super.preStart()
|
||||
} catch {
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.stream.impl.io
|
||||
|
||||
import java.io.OutputStream
|
||||
import java.nio.file.{ OpenOption, Path, StandardOpenOption }
|
||||
import java.nio.file.{ Path, OpenOption }
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
import akka.stream._
|
||||
|
|
@ -21,7 +21,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
* Creates simple synchronous Sink which writes all incoming elements to the given file
|
||||
* (creating it before hand if necessary).
|
||||
*/
|
||||
@InternalApi private[akka] final class FileSink(f: Path, options: immutable.Set[OpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
|
||||
@InternalApi private[akka] final class FileSink(f: Path, startPosition: Long, options: immutable.Set[OpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
|
||||
extends SinkModule[ByteString, Future[IOResult]](shape) {
|
||||
|
||||
override protected def label: String = s"FileSink($f, $options)"
|
||||
|
|
@ -31,7 +31,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
val settings = materializer.effectiveSettings(context.effectiveAttributes)
|
||||
|
||||
val ioResultPromise = Promise[IOResult]()
|
||||
val props = FileSubscriber.props(f, ioResultPromise, settings.maxInputBufferSize, options)
|
||||
val props = FileSubscriber.props(f, ioResultPromise, settings.maxInputBufferSize, startPosition, options)
|
||||
val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher
|
||||
|
||||
val ref = materializer.actorOf(context, props.withDispatcher(dispatcher))
|
||||
|
|
@ -39,10 +39,10 @@ import scala.concurrent.{ Future, Promise }
|
|||
}
|
||||
|
||||
override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] =
|
||||
new FileSink(f, options, attributes, shape)
|
||||
new FileSink(f, startPosition, options, attributes, shape)
|
||||
|
||||
override def withAttributes(attr: Attributes): SinkModule[ByteString, Future[IOResult]] =
|
||||
new FileSink(f, options, attr, amendShape(attr))
|
||||
new FileSink(f, startPosition, options, attr, amendShape(attr))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -21,16 +21,17 @@ import scala.concurrent.{ Future, Promise }
|
|||
* INTERNAL API
|
||||
* Creates simple synchronous Source backed by the given file.
|
||||
*/
|
||||
@InternalApi private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
|
||||
@InternalApi private[akka] final class FileSource(f: Path, chunkSize: Int, startPosition: Long, val attributes: Attributes, shape: SourceShape[ByteString])
|
||||
extends SourceModule[ByteString, Future[IOResult]](shape) {
|
||||
require(chunkSize > 0, "chunkSize must be greater than 0")
|
||||
require(startPosition >= 0, "startPosition must be equal or greater than 0")
|
||||
override def create(context: MaterializationContext) = {
|
||||
// FIXME rewrite to be based on GraphStage rather than dangerous downcasts
|
||||
val materializer = ActorMaterializerHelper.downcast(context.materializer)
|
||||
val settings = materializer.effectiveSettings(context.effectiveAttributes)
|
||||
|
||||
val ioResultPromise = Promise[IOResult]()
|
||||
val props = FilePublisher.props(f, ioResultPromise, chunkSize, settings.initialInputBufferSize, settings.maxInputBufferSize)
|
||||
val props = FilePublisher.props(f, ioResultPromise, chunkSize, startPosition, settings.initialInputBufferSize, settings.maxInputBufferSize)
|
||||
val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher
|
||||
|
||||
val ref = materializer.actorOf(context, props.withDispatcher(dispatcher))
|
||||
|
|
@ -39,10 +40,10 @@ import scala.concurrent.{ Future, Promise }
|
|||
}
|
||||
|
||||
override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[IOResult]] =
|
||||
new FileSource(f, chunkSize, attributes, shape)
|
||||
new FileSource(f, chunkSize, startPosition, attributes, shape)
|
||||
|
||||
override def withAttributes(attr: Attributes): SourceModule[ByteString, Future[IOResult]] =
|
||||
new FileSource(f, chunkSize, attr, amendShape(attr))
|
||||
new FileSource(f, chunkSize, startPosition, attr, amendShape(attr))
|
||||
|
||||
override protected def label: String = s"FileSource($f, $chunkSize)"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ object FileIO {
|
|||
|
||||
/**
|
||||
* Creates a Sink that writes incoming [[ByteString]] elements to the given file.
|
||||
* Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[StandardOpenOption])]].
|
||||
* Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[OpenOption])]].
|
||||
*
|
||||
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
|
||||
* and a possible exception if IO operation was not completed successfully.
|
||||
|
|
@ -35,7 +35,7 @@ object FileIO {
|
|||
|
||||
/**
|
||||
* Creates a Sink that writes incoming [[ByteString]] elements to the given file path.
|
||||
* Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[StandardOpenOption])]].
|
||||
* Overwrites existing files, if you want to append to an existing file use [[#file(Path, util.Set[OpenOption])]].
|
||||
*
|
||||
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
|
||||
* and a possible exception if IO operation was not completed successfully.
|
||||
|
|
@ -79,6 +79,22 @@ object FileIO {
|
|||
def toPath[Opt <: OpenOption](f: Path, options: util.Set[Opt]): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
|
||||
new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet).toCompletionStage())
|
||||
|
||||
/**
|
||||
* Creates a Sink that writes incoming [[ByteString]] elements to the given file path.
|
||||
*
|
||||
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
|
||||
* and a possible exception if IO operation was not completed successfully.
|
||||
*
|
||||
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
|
||||
* set it for a given Source by using [[ActorAttributes]].
|
||||
*
|
||||
* @param f The file path to write to
|
||||
* @param options File open options
|
||||
* @param startPosition startPosition the start position to read from, defaults to 0
|
||||
*/
|
||||
def toPath[Opt <: OpenOption](f: Path, options: util.Set[Opt], startPosition: Long): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
|
||||
new Sink(scaladsl.FileIO.toPath(f, options.asScala.toSet, startPosition).toCompletionStage())
|
||||
|
||||
/**
|
||||
* Creates a Source from a files contents.
|
||||
* Emitted elements are [[ByteString]] elements, chunked by default by 8192 bytes,
|
||||
|
|
@ -143,4 +159,22 @@ object FileIO {
|
|||
*/
|
||||
def fromPath(f: Path, chunkSize: Int): javadsl.Source[ByteString, CompletionStage[IOResult]] =
|
||||
new Source(scaladsl.FileIO.fromPath(f, chunkSize).toCompletionStage())
|
||||
|
||||
/**
|
||||
* Creates a synchronous Source from a files contents.
|
||||
* Emitted elements are `chunkSize` sized [[ByteString]] elements,
|
||||
* except the last element, which will be up to `chunkSize` in size.
|
||||
*
|
||||
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
|
||||
* set it for a given Source by using [[ActorAttributes]].
|
||||
*
|
||||
* It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
|
||||
* and a possible exception if IO operation was not completed successfully.
|
||||
*
|
||||
* @param f the file path to read from
|
||||
* @param chunkSize the size of each read operation
|
||||
* @param startPosition startPosition the start position to read from, defaults to 0
|
||||
*/
|
||||
def fromPath(f: Path, chunkSize: Int, startPosition: Long): javadsl.Source[ByteString, CompletionStage[IOResult]] =
|
||||
new Source(scaladsl.FileIO.fromPath(f, chunkSize, startPosition).toCompletionStage())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,7 +55,25 @@ object FileIO {
|
|||
* @param chunkSize the size of each read operation, defaults to 8192
|
||||
*/
|
||||
def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
|
||||
Source.fromGraph(new FileSource(f, chunkSize, DefaultAttributes.fileSource, sourceShape("FileSource")))
|
||||
fromPath(f, chunkSize, startPosition = 0)
|
||||
|
||||
/**
|
||||
* Creates a Source from a files contents.
|
||||
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
|
||||
* except the final element, which will be up to `chunkSize` in size.
|
||||
*
|
||||
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
|
||||
* set it for a given Source by using [[ActorAttributes]].
|
||||
*
|
||||
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
|
||||
* and a possible exception if IO operation was not completed successfully.
|
||||
*
|
||||
* @param f the file path to read from
|
||||
* @param chunkSize the size of each read operation, defaults to 8192
|
||||
* @param startPosition the start position to read from
|
||||
*/
|
||||
def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =
|
||||
Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource")))
|
||||
|
||||
/**
|
||||
* Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files by default.
|
||||
|
|
@ -86,5 +104,21 @@ object FileIO {
|
|||
* @param options File open options, defaults to Set(WRITE, CREATE)
|
||||
*/
|
||||
def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] =
|
||||
Sink.fromGraph(new FileSink(f, options, DefaultAttributes.fileSink, sinkShape("FileSink")))
|
||||
toPath(f, options, startPosition = 0)
|
||||
|
||||
/**
|
||||
* Creates a Sink which writes incoming [[ByteString]] elements to the given file path. Overwrites existing files by default.
|
||||
*
|
||||
* Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
|
||||
* and a possible exception if IO operation was not completed successfully.
|
||||
*
|
||||
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
|
||||
* unless configured otherwise by using [[ActorAttributes]].
|
||||
*
|
||||
* @param f the file path to write to
|
||||
* @param options File open options, defaults to Set(WRITE, CREATE)
|
||||
* @param startPosition the start position to write to
|
||||
*/
|
||||
def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =
|
||||
Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1118,6 +1118,14 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.DaemonMsgCreateSerializer.serialization"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.DaemonMsgCreateSerializer.this"),
|
||||
|
||||
// #22657 changes to internal classes
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FilePublisher.props"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FilePublisher.this"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSink.this"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSource.this"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSubscriber.props"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSubscriber.this"),
|
||||
|
||||
// Internal MessageBuffer for actors
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.PerGroupingBuffer.akka$cluster$pubsub$PerGroupingBuffer$$buffers"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.PerGroupingBuffer.akka$cluster$pubsub$PerGroupingBuffer$_setter_$akka$cluster$pubsub$PerGroupingBuffer$$buffers_="),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue