Re-write file sink as a graph stage (#27247)

* Re-write file sink as a graph stage

Refs #26187
This commit is contained in:
Christopher Batey 2019-07-08 08:54:59 +01:00 committed by GitHub
parent 47c65d266a
commit d4167bc930
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 148 additions and 186 deletions

View file

@ -306,9 +306,9 @@ and then it will behave as in Akka 2.5.x:
akka.coordinated-shutdown.run-by-actor-system-terminate = off
```
### IOSources
### IOSources & FileIO
`StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required
`FileIO.toPath` and `StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required
to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].

View file

@ -7,26 +7,26 @@ package akka.stream.io
import java.nio.file.StandardOpenOption.{ CREATE, WRITE }
import java.nio.file._
import akka.actor.ActorSystem
import akka.dispatch.{ Dispatchers, ExecutionContexts }
import akka.stream.impl.PhasedFusingActorMaterializer
import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.scaladsl.{ FileIO, Sink, Source }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.dispatch.ExecutionContexts
import akka.stream._
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.scaladsl.{ FileIO, Keep, Sink, Source }
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.util.ByteString
import com.github.ghik.silencer.silent
import com.google.common.jimfs.{ Configuration, Jimfs }
import org.scalatest.concurrent.ScalaFutures
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import com.github.ghik.silencer.silent
import scala.concurrent.{ Await, Future }
import scala.util.Success
@silent
class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
@ -159,40 +159,40 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
targetFile { f =>
val sys = ActorSystem("FileSinkSpec-dispatcher-testing-1", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
val forever = Source.maybe.toMat(FileIO.toPath(f))(Keep.left).run()
try {
Source.fromIterator(() => Iterator.continually(TestByteStrings.head)).runWith(FileIO.toPath(f))(materializer)
materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSink").get
// haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever
assertDispatcher(ref, Dispatchers.DefaultBlockingDispatcherId)
} finally shutdown(sys)
val children = expectMsgType[Children]
val ref = withClue(children) {
val fileSink = children.children.find(_.path.toString contains "fileSink")
fileSink shouldBe defined
fileSink.get
}
assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
} finally {
forever.complete(Success(None))
}
}
}
"allow overriding the dispatcher using Attributes" in assertAllStagesStopped {
targetFile { f =>
val sys = ActorSystem("FileSinkSpec-dispatcher-testing-2", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
val forever = Source.maybe
.toMat(FileIO.toPath(f).addAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")))(Keep.left)
.run()
try {
Source
.fromIterator(() => Iterator.continually(TestByteStrings.head))
.to(FileIO.toPath(f).addAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")))
.run()(materializer)
materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSink").get
assertDispatcher(ref, "akka.actor.default-dispatcher")
} finally shutdown(sys)
} finally {
forever.complete(Success(None))
}
}
}
@ -209,22 +209,23 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
}(ExecutionContexts.sameThreadExecutionContext)))
Await.result(completion, 3.seconds)
checkFileContents(f, TestLines.head)
}
}
"complete materialized future with an exception when upstream fails" in assertAllStagesStopped {
val te = TE("oh no")
targetFile { f =>
val completion = Source(TestByteStrings)
.map { bytes =>
if (bytes.contains('b')) throw new Error("bees!")
if (bytes.contains('b')) throw te
bytes
}
.runWith(FileIO.toPath(f))
val ex = intercept[AbruptIOTerminationException] { Await.result(completion, 3.seconds) }
ex.ioResult.count should equal(1001)
val ex = intercept[IOOperationIncompleteException] { Await.result(completion, 3.seconds) }
ex.count should equal(1001)
ex.getCause should equal(te)
checkFileContents(f, TestLines.takeWhile(!_.contains('b')).mkString(""))
}
}
@ -233,7 +234,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
val completion =
Source.single(ByteString("42")).runWith(FileIO.toPath(fs.getPath("/I/hope/this/file/doesnt/exist.txt")))
completion.failed.futureValue shouldBe an[NoSuchFileException]
completion.failed.futureValue.getCause shouldBe an[NoSuchFileException]
}
}

View file

@ -108,6 +108,9 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPubl
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSinkActor$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSink")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSinkActor")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSink")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber")
# #25045 adding Java/Scala interop to SourceQueue and SinkQueue

View file

@ -0,0 +1,107 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.io
import java.nio.channels.FileChannel
import java.nio.file.{ OpenOption, Path }
import scala.collection.immutable
import akka.annotation.InternalApi
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.{
AbruptStageTerminationException,
Attributes,
IOOperationIncompleteException,
IOResult,
Inlet,
SinkShape
}
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler }
import akka.util.ByteString
import akka.util.ccompat.JavaConverters._
import scala.concurrent.{ Future, Promise }
import scala.util.Success
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class FileOutputStage(path: Path, startPosition: Long, openOptions: immutable.Set[OpenOption])
extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[IOResult]] {
val in: Inlet[ByteString] = Inlet("FileSink")
override def shape: SinkShape[ByteString] = SinkShape(in)
override def initialAttributes: Attributes = DefaultAttributes.fileSink
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = {
val mat = Promise[IOResult]
val logic = new GraphStageLogic(shape) with InHandler {
private var chan: FileChannel = _
private var bytesWritten: Long = 0
override def preStart(): Unit = {
try {
chan = FileChannel.open(path, openOptions.asJava)
if (startPosition > 0) {
chan.position(startPosition)
}
pull(in)
} catch {
case NonFatal(t) =>
closeFile(Some(new IOOperationIncompleteException(bytesWritten, t)))
failStage(t)
}
}
override def onPush(): Unit = {
val next = grab(in)
try {
bytesWritten += chan.write(next.asByteBuffer)
pull(in)
} catch {
case NonFatal(t) =>
closeFile(Some(new IOOperationIncompleteException(bytesWritten, t)))
failStage(t)
}
}
override def onUpstreamFailure(t: Throwable): Unit = {
closeFile(Some(new IOOperationIncompleteException(bytesWritten, t)))
failStage(t)
}
override def onUpstreamFinish(): Unit = {
closeFile(None)
completeStage()
}
override def postStop(): Unit = {
if (!mat.isCompleted) {
val failure = new AbruptStageTerminationException(this)
closeFile(Some(failure))
mat.tryFailure(failure)
}
}
private def closeFile(failed: Option[Throwable]): Unit = {
try {
if (chan ne null) chan.close()
failed match {
case Some(t) => mat.tryFailure(t)
case None => mat.tryComplete(Success(IOResult(bytesWritten)))
}
} catch {
case NonFatal(t) =>
mat.tryFailure(failed.getOrElse(t))
}
}
setHandler(in, this)
}
(logic, mat.future)
}
}

View file

@ -1,108 +0,0 @@
/*
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.io
import java.nio.channels.FileChannel
import java.nio.file.{ OpenOption, Path }
import akka.Done
import akka.actor.{ ActorLogging, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.{ AbruptIOTerminationException, IOResult }
import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy }
import akka.util.ByteString
import com.github.ghik.silencer.silent
import akka.util.ccompat.JavaConverters._
import scala.concurrent.Promise
import scala.util.{ Failure, Success, Try }
/** INTERNAL API */
@InternalApi private[akka] object FileSubscriber {
def props(
f: Path,
completionPromise: Promise[IOResult],
bufSize: Int,
startPosition: Long,
openOptions: Set[OpenOption]) = {
require(bufSize > 0, "buffer size must be > 0")
require(startPosition >= 0, s"startPosition must be >= 0 (was $startPosition)")
Props(classOf[FileSubscriber], f, completionPromise, bufSize, startPosition, openOptions).withDeploy(Deploy.local)
}
}
/** INTERNAL API */
@silent
@InternalApi private[akka] class FileSubscriber(
f: Path,
completionPromise: Promise[IOResult],
bufSize: Int,
startPosition: Long,
openOptions: Set[OpenOption])
extends akka.stream.actor.ActorSubscriber
with ActorLogging {
override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize)
private var chan: FileChannel = _
private var bytesWritten: Long = 0
override def preStart(): Unit =
try {
chan = FileChannel.open(f, openOptions.asJava)
if (startPosition > 0) {
chan.position(startPosition)
}
super.preStart()
} catch {
case ex: Exception =>
closeAndComplete(Failure(ex))
cancel()
}
def receive = {
case ActorSubscriberMessage.OnNext(bytes: ByteString) =>
try {
bytesWritten += chan.write(bytes.asByteBuffer)
} catch {
case ex: Exception =>
closeAndComplete(Success(IOResult(bytesWritten, Failure(ex))))
cancel()
}
case ActorSubscriberMessage.OnError(ex) =>
log.error(ex, "Tearing down FileSink({}) due to upstream error", f)
closeAndComplete(Failure(AbruptIOTerminationException(IOResult(bytesWritten, Success(Done)), ex)))
context.stop(self)
case ActorSubscriberMessage.OnComplete => context.stop(self)
}
override def postStop(): Unit = {
closeAndComplete(Success(IOResult(bytesWritten, Success(Done))))
super.postStop()
}
private def closeAndComplete(result: Try[IOResult]): Unit = {
try {
// close the channel/file before completing the promise, allowing the
// file to be deleted, which would not work (on some systems) if the
// file is still open for writing
if (chan ne null) chan.close()
completionPromise.tryComplete(result)
} catch {
case closingException: Exception =>
result match {
case Success(ioResult) =>
val statusWithClosingException =
ioResult.status.transform(_ => Failure(closingException), ex => Failure(closingException.initCause(ex)))
completionPromise.trySuccess(ioResult.copy(status = statusWithClosingException))
case Failure(ex) => completionPromise.tryFailure(closingException.initCause(ex))
}
}
}
}

View file

@ -5,53 +5,14 @@
package akka.stream.impl.io
import java.io.OutputStream
import java.nio.file.{ OpenOption, Path }
import akka.annotation.InternalApi
import akka.stream.ActorAttributes.Dispatcher
import akka.stream._
import akka.stream.impl.SinkModule
import akka.util.ByteString
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
/**
* INTERNAL API
* Creates simple synchronous Sink which writes all incoming elements to the given file
* (creating it before hand if necessary).
*/
@InternalApi private[akka] final class FileSink(
f: Path,
startPosition: Long,
options: immutable.Set[OpenOption],
val attributes: Attributes,
shape: SinkShape[ByteString])
extends SinkModule[ByteString, Future[IOResult]](shape) {
override protected def label: String = s"FileSink($f, $options)"
override def create(context: MaterializationContext) = {
val materializer = ActorMaterializerHelper.downcast(context.materializer)
val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max
val ioResultPromise = Promise[IOResult]()
val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options)
val ref = materializer.actorOf(
context,
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[Dispatcher].dispatcher))
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)
}
override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] =
new FileSink(f, startPosition, options, attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[ByteString, Future[IOResult]] =
new FileSink(f, startPosition, options, attr, amendShape(attr))
}
/**
* INTERNAL API
* Creates simple synchronous Sink which writes all incoming elements to the output stream.

View file

@ -20,8 +20,6 @@ import scala.concurrent.Future
*/
object FileIO {
import Sink.{ shape => sinkShape }
/**
* Creates a Source from a files contents.
* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
@ -139,5 +137,5 @@ object FileIO {
* @param startPosition the start position to write to
*/
def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =
Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))
Sink.fromGraph(new FileOutputStage(f, startPosition, options))
}