=str 21732 rewrite file souce with NIO and GraphStagу

add MiMa excludes and additional test

add benchmark and move eclusions to 2.5.4

add future completion on stream termination
This commit is contained in:
Alexander Golubev 2016-09-21 23:42:27 +02:00 committed by Patrik Nordwall
parent e57cab6c85
commit 000ed93576
7 changed files with 259 additions and 164 deletions

View file

@ -0,0 +1,84 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.io
import java.nio.file.{ Files, Path }
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ ActorMaterializer, Attributes, IOResult }
import akka.util.ByteString
import akka.{ Done, NotUsed }
import org.openjdk.jmh.annotations.{ BenchmarkMode, OutputTimeUnit, Scope, State, _ }
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.AverageTime))
@Fork(1)
@Threads(1)
@Warmup(iterations = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class FileSourcesScaleBenchmark {
/**
* Benchmark (bufSize) Mode Cnt Score Error Units
* FileSourcesScaleBenchmark.flatMapMerge 2048 avgt 10 1.587 ± 0.118 s/op
* FileSourcesScaleBenchmark.mapAsync 2048 avgt 10 0.899 ± 0.103 s/op
*/
implicit val system = ActorSystem("file-sources-benchmark")
implicit val materializer = ActorMaterializer()
val FILES_NUMBER = 40
val files: Seq[Path] = {
val line = ByteString("x" * 2048 + "\n")
(1 to FILES_NUMBER).map(i => {
val f = Files.createTempFile(getClass.getName, i + ".bench.tmp")
val ft = Source.fromIterator(() Iterator.continually(line))
.take(20000) // adjust as needed
.runWith(FileIO.toPath(f))
Await.result(ft, 300.seconds)
f
})
}
@Param(Array("2048"))
var bufSize = 0
var fileChannelSource: Seq[Source[ByteString, Future[IOResult]]] = _
@Setup
def setup(): Unit = {
fileChannelSource = files.map(FileIO.fromPath(_, bufSize))
}
@TearDown
def teardown(): Unit = {
files.foreach(Files.delete)
}
@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), Duration.Inf)
}
@Benchmark
def flatMapMerge(): Unit = {
val h = Source.fromIterator(() => files.iterator)
.flatMapMerge(FILES_NUMBER, path => FileIO.fromPath(path, bufSize)).runWith(Sink.ignore)
Await.result(h, 300.seconds)
}
@Benchmark
def mapAsync(): Unit = {
val h = Source.fromIterator(() => files.iterator)
.mapAsync(FILES_NUMBER)(path => FileIO.fromPath(path, bufSize).runWith(Sink.ignore)).runWith(Sink.ignore)
Await.result(h, 300.seconds)
}
}

View file

@ -3,25 +3,22 @@
*/ */
package akka.stream.io package akka.stream.io
import java.nio.file.{ FileSystems, Files }
import java.nio.charset.StandardCharsets.UTF_8 import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.Random import java.util.Random
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.stream.ActorMaterializer import akka.stream.IOResult._
import akka.stream.ActorMaterializerSettings import akka.stream._
import akka.stream.ActorAttributes
import akka.stream.Attributes
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.StreamSupervisor.Children
import akka.stream.io.FileSourceSpec.Settings import akka.stream.io.FileSourceSpec.Settings
import akka.stream.scaladsl.{ FileIO, Keep, Sink } import akka.stream.scaladsl.{ FileIO, Keep, Sink }
import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestDuration import akka.testkit.TestDuration
import akka.util.ByteString import akka.util.ByteString
import akka.util.Timeout
import com.google.common.jimfs.{ Configuration, Jimfs } import com.google.common.jimfs.{ Configuration, Jimfs }
import scala.concurrent.Await import scala.concurrent.Await
@ -75,16 +72,16 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
"FileSource" must { "FileSource" must {
"read contents from a file" in assertAllStagesStopped { "read contents from a file" in assertAllStagesStopped {
val chunkSize = 512 val chunkSize = 512
val bufferAttributes = Attributes.inputBuffer(1, 2)
val p = FileIO.fromPath(testFile, chunkSize) val p = FileIO.fromPath(testFile, chunkSize)
.withAttributes(bufferAttributes) .addAttributes(Attributes.inputBuffer(1, 2))
.runWith(Sink.asPublisher(false)) .runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[ByteString]() val c = TestSubscriber.manualProbe[ByteString]()
p.subscribe(c) p.subscribe(c)
val sub = c.expectSubscription() val sub = c.expectSubscription()
var remaining = TestText var remaining = TestText
def nextChunk() = { def nextChunk() = {
val (chunk, rest) = remaining.splitAt(chunkSize) val (chunk, rest) = remaining.splitAt(chunkSize)
remaining = rest remaining = rest
@ -108,6 +105,18 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
c.expectComplete() c.expectComplete()
} }
"complete future even when abrupt termination happened" in {
val chunkSize = 512
val mat = ActorMaterializer()
val (future, p) = FileIO.fromPath(testFile, chunkSize)
.addAttributes(Attributes.inputBuffer(1, 2))
.toMat(TestSink.probe)(Keep.both).run()(mat)
p.request(1)
p.expectNext().utf8String should ===(TestText.splitAt(chunkSize)._1)
mat.shutdown()
Await.result(future, 3.seconds) === createSuccessful(chunkSize)
}
"read partial contents from a file" in assertAllStagesStopped { "read partial contents from a file" in assertAllStagesStopped {
val chunkSize = 512 val chunkSize = 512
val startPosition = 1000 val startPosition = 1000
@ -119,8 +128,8 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
val c = TestSubscriber.manualProbe[ByteString]() val c = TestSubscriber.manualProbe[ByteString]()
p.subscribe(c) p.subscribe(c)
val sub = c.expectSubscription() val sub = c.expectSubscription()
var remaining = TestText.drop(1000) var remaining = TestText.drop(1000)
def nextChunk() = { def nextChunk() = {
val (chunk, rest) = remaining.splitAt(chunkSize) val (chunk, rest) = remaining.splitAt(chunkSize)
remaining = rest remaining = rest
@ -132,18 +141,27 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
for (_ 1 to 10) { for (_ 1 to 10) {
c.expectNext().utf8String should ===(nextChunk().toString) c.expectNext().utf8String should ===(nextChunk().toString)
} }
c.expectComplete() c.expectComplete()
} }
"be able to read not whole file" in {
val chunkSize = 512
val (future, p) = FileIO.fromPath(testFile, chunkSize)
.addAttributes(Attributes.inputBuffer(1, 2))
.toMat(TestSink.probe)(Keep.both).run()
p.request(1)
p.expectNext().utf8String should ===(TestText.splitAt(chunkSize)._1)
p.cancel()
Await.result(future, 3.seconds) === createSuccessful(chunkSize)
}
"complete only when all contents of a file have been signalled" in assertAllStagesStopped { "complete only when all contents of a file have been signalled" in assertAllStagesStopped {
val chunkSize = 256 val chunkSize = 256
val bufferAttributes = Attributes.inputBuffer(4, 8)
val demandAllButOneChunks = TestText.length / chunkSize - 1 val demandAllButOneChunks = TestText.length / chunkSize - 1
val p = FileIO.fromPath(testFile, chunkSize) val p = FileIO.fromPath(testFile, chunkSize)
.withAttributes(bufferAttributes) .addAttributes(Attributes.inputBuffer(4, 8))
.runWith(Sink.asPublisher(false)) .runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[ByteString]() val c = TestSubscriber.manualProbe[ByteString]()
@ -177,7 +195,7 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
c.expectSubscription() c.expectSubscription()
c.expectError() c.expectError()
val ioResult = Await.result(r, 3.seconds.dilated).status.isFailure shouldBe true Await.result(r, 3.seconds.dilated).status.isFailure shouldBe true
} }
List( List(

View file

@ -48,6 +48,19 @@ class SinkAsJavaStreamSpec extends StreamSpec(UnboundedMailboxConfig) {
javaSource.close() javaSource.close()
} }
"allow overriding the dispatcher using Attributes" in Utils.assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
try {
TestSource.probe[ByteString].runWith(StreamConverters.asJavaStream()
.addAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")))(materializer)
materializer.asInstanceOf[PhasedFusingActorMaterializer].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "asJavaStream").get
assertDispatcher(ref, "akka.actor.default-dispatcher")
} finally shutdown(sys)
}
"work in separate IO dispatcher" in Utils.assertAllStagesStopped { "work in separate IO dispatcher" in Utils.assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys) val materializer = ActorMaterializer()(sys)

View file

@ -0,0 +1,11 @@
# #21732Migrate JavaSource to GraphStage
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.io.FileSource")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSource.newInstance")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.FileSource.withAttributes")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSource.attributes")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSource.label")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSource.create")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.FileSource.this")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FilePublisher$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FilePublisher$Continue$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FilePublisher")

View file

@ -1,123 +0,0 @@
/**
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.io
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.Path
import akka.Done
import akka.actor.{ ActorLogging, DeadLetterSuppression, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisherMessage
import akka.stream.IOResult
import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Promise
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
/** INTERNAL API */
@InternalApi private[akka] object FilePublisher {
def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, startPosition: Long, initialBuffer: Int, maxBuffer: Int) = {
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
require(startPosition >= 0, s"startPosition must be >= 0 (was $startPosition)")
require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)")
require(maxBuffer >= initialBuffer, s"maxBuffer must be >= initialBuffer (was $maxBuffer)")
Props(classOf[FilePublisher], f, completionPromise, chunkSize, startPosition, initialBuffer, maxBuffer)
.withDeploy(Deploy.local)
}
private case object Continue extends DeadLetterSuppression
val Read = java.util.Collections.singleton(java.nio.file.StandardOpenOption.READ)
}
/** INTERNAL API */
@InternalApi private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, startPosition: Long, initialBuffer: Int, maxBuffer: Int)
extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging {
import FilePublisher._
var eofReachedAtOffset = Long.MinValue
val buf = ByteBuffer.allocate(chunkSize)
var readBytesTotal = 0L
var availableChunks: Vector[ByteString] = Vector.empty // TODO possibly resign read-ahead-ing and make fusable as Stage
private var chan: FileChannel = _
override def preStart() = {
try {
chan = FileChannel.open(f, FilePublisher.Read)
if (startPosition > 0) {
chan.position(startPosition)
}
} catch {
case NonFatal(ex)
completionPromise.trySuccess(IOResult(0L, Failure(ex)))
onErrorThenStop(ex)
}
super.preStart()
}
def receive = {
case ActorPublisherMessage.Request(elements) readAndSignal(maxBuffer)
case Continue readAndSignal(maxBuffer)
case ActorPublisherMessage.Cancel context.stop(self)
}
def readAndSignal(maxReadAhead: Int): Unit =
if (isActive) {
// Write previously buffered, then refill buffer
availableChunks = readAhead(maxReadAhead, signalOnNexts(availableChunks))
if (totalDemand > 0 && isActive) self ! Continue
}
@tailrec private def signalOnNexts(chunks: Vector[ByteString]): Vector[ByteString] =
if (chunks.nonEmpty && totalDemand > 0) {
onNext(chunks.head)
signalOnNexts(chunks.tail)
} else {
if (chunks.isEmpty && eofEncountered)
onCompleteThenStop()
chunks
}
/** BLOCKING I/O READ */
@tailrec def readAhead(maxChunks: Int, chunks: Vector[ByteString]): Vector[ByteString] =
if (chunks.size <= maxChunks && isActive && !eofEncountered) {
(try chan.read(buf) catch { case NonFatal(ex) onErrorThenStop(ex); Int.MinValue }) match {
case -1 // EOF
eofReachedAtOffset = chan.position
log.debug("No more bytes available to read (got `-1` from `read`), marking final bytes of file @ " + eofReachedAtOffset)
chunks
case 0 readAhead(maxChunks, chunks) // had nothing to read into this chunk
case Int.MinValue Vector.empty // read failed, we're done here
case readBytes
buf.flip()
readBytesTotal += readBytes
val newChunks = chunks :+ ByteString.fromByteBuffer(buf)
buf.clear()
readAhead(maxChunks, newChunks)
}
} else chunks
private def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue
override def postStop(): Unit = {
super.postStop()
try {
if (chan ne null) chan.close()
} catch {
case NonFatal(ex)
completionPromise.trySuccess(IOResult(readBytesTotal, Failure(ex)))
}
completionPromise.trySuccess(IOResult(readBytesTotal, Success(Done)))
}
}

View file

@ -4,48 +4,140 @@
package akka.stream.impl.io package akka.stream.impl.io
import java.io.InputStream import java.io.InputStream
import java.nio.file.Path import java.nio.ByteBuffer
import java.nio.channels.{ CompletionHandler, FileChannel }
import java.nio.file.{ Files, Path, StandardOpenOption }
import akka.Done
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream._ import akka.stream.Attributes.InputBuffer
import akka.stream.ActorAttributes.Dispatcher import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.IOResult
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.impl.{ ErrorPublisher, SourceModule } import akka.stream.impl.{ ErrorPublisher, SourceModule }
import akka.stream.stage._
import akka.stream.{ IOResult, _ }
import akka.util.ByteString import akka.util.ByteString
import org.reactivestreams._ import org.reactivestreams.Publisher
import scala.annotation.tailrec
import scala.concurrent.{ Future, Promise } import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
/** /**
* INTERNAL API * INTERNAL API
* Creates simple synchronous Source backed by the given file.
*/ */
@InternalApi private[akka] final class FileSource(f: Path, chunkSize: Int, startPosition: Long, val attributes: Attributes, shape: SourceShape[ByteString])
extends SourceModule[ByteString, Future[IOResult]](shape) { private[akka] object FileSource {
val completionHandler = new CompletionHandler[Integer, Try[Int] Unit] {
override def completed(result: Integer, attachment: Try[Int] Unit): Unit = {
attachment(Success(result))
}
override def failed(ex: Throwable, attachment: Try[Int] Unit): Unit = {
attachment(Failure(ex))
}
}
}
/**
* INTERNAL API
* Creates simple asynchronous Source backed by the given file.
*/
private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition: Long)
extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] {
require(chunkSize > 0, "chunkSize must be greater than 0") require(chunkSize > 0, "chunkSize must be greater than 0")
require(startPosition >= 0, "startPosition must be equal or greater than 0") val out = Outlet[ByteString]("FileSource.out")
override def create(context: MaterializationContext) = {
// FIXME rewrite to be based on GraphStage rather than dangerous downcasts
val materializer = ActorMaterializerHelper.downcast(context.materializer)
val settings = materializer.effectiveSettings(context.effectiveAttributes)
override val shape = SourceShape(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = {
val ioResultPromise = Promise[IOResult]() val ioResultPromise = Promise[IOResult]()
val props = FilePublisher.props(f, ioResultPromise, chunkSize, startPosition, settings.initialInputBufferSize, settings.maxInputBufferSize)
val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher
val ref = materializer.actorOf(context, props.withDispatcher(dispatcher)) val logic = new GraphStageLogic(shape) with OutHandler {
handler
val buffer = ByteBuffer.allocate(chunkSize)
val maxReadAhead = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
var channel: FileChannel = _
var position = startPosition
var chunkCallback: Try[Int] Unit = _
var eofEncountered = false
var availableChunks: Vector[ByteString] = Vector.empty[ByteString]
(akka.stream.actor.ActorPublisher[ByteString](ref), ioResultPromise.future) setHandler(out, this)
override def preStart(): Unit = {
try {
// this is a bit weird but required to keep existing semantics
require(Files.exists(path), s"Path '$path' does not exist")
require(Files.isRegularFile(path), s"Path '$path' is not a regular file")
require(Files.isReadable(path), s"Missing read permission for '$path'")
channel = FileChannel.open(path, StandardOpenOption.READ)
channel.position(position)
} catch {
case ex: Exception
ioResultPromise.trySuccess(IOResult(position, Failure(ex)))
throw ex
}
}
override def onPull(): Unit = {
if (availableChunks.size < maxReadAhead && !eofEncountered)
availableChunks = readAhead(maxReadAhead, availableChunks)
//if already read something and try
if (availableChunks.nonEmpty) {
emitMultiple(out, availableChunks.iterator,
() if (eofEncountered) success() else setHandler(out, handler)
)
availableChunks = Vector.empty[ByteString]
} else if (eofEncountered) success()
}
private def success(): Unit = {
completeStage()
ioResultPromise.trySuccess(IOResult(position, Success(Done)))
}
/** BLOCKING I/O READ */
@tailrec def readAhead(maxChunks: Int, chunks: Vector[ByteString]): Vector[ByteString] =
if (chunks.size < maxChunks && !eofEncountered) {
val readBytes = try channel.read(buffer, position) catch {
case NonFatal(ex)
failStage(ex)
ioResultPromise.trySuccess(IOResult(position, Failure(ex)))
throw ex
}
if (readBytes > 0) {
buffer.flip()
position += readBytes
val newChunks = chunks :+ ByteString.fromByteBuffer(buffer)
buffer.clear()
if (readBytes < chunkSize) {
eofEncountered = true
newChunks
} else readAhead(maxChunks, newChunks)
} else {
eofEncountered = true
chunks
}
} else chunks
override def onDownstreamFinish(): Unit = success()
override def postStop(): Unit = {
ioResultPromise.trySuccess(IOResult(position, Success(Done)))
if ((channel ne null) && channel.isOpen) channel.close()
}
}
(logic, ioResultPromise.future)
} }
override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[IOResult]] = override def toString = s"FileSource($path, $chunkSize)"
new FileSource(f, chunkSize, startPosition, attributes, shape)
override def withAttributes(attr: Attributes): SourceModule[ByteString, Future[IOResult]] =
new FileSource(f, chunkSize, startPosition, attr, amendShape(attr))
override protected def label: String = s"FileSource($f, $chunkSize)"
} }
/** /**
@ -79,4 +171,4 @@ import scala.concurrent.{ Future, Promise }
override def withAttributes(attr: Attributes): SourceModule[ByteString, Future[IOResult]] = override def withAttributes(attr: Attributes): SourceModule[ByteString, Future[IOResult]] =
new InputStreamSource(createInputStream, chunkSize, attr, amendShape(attr)) new InputStreamSource(createInputStream, chunkSize, attr, amendShape(attr))
} }

View file

@ -73,7 +73,7 @@ object FileIO {
* @param startPosition the start position to read from * @param startPosition the start position to read from
*/ */
def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] = def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =
Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource"))) Source.fromGraph(new FileSource(f, chunkSize, startPosition)).withAttributes(DefaultAttributes.fileSource)
/** /**
* Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files * Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files