From d4167bc930cd4d4dc0bc8da9e3be269dd8c56587 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Mon, 8 Jul 2019 08:54:59 +0100 Subject: [PATCH] Re-write file sink as a graph stage (#27247) * Re-write file sink as a graph stage Refs #26187 --- .../project/migration-guide-2.5.x-2.6.x.md | 4 +- .../scala/akka/stream/io/FileSinkSpec.scala | 69 +++++------ .../mima-filters/2.5.x.backwards.excludes | 3 + .../akka/stream/impl/io/FileOutputStage.scala | 107 +++++++++++++++++ .../akka/stream/impl/io/FileSubscriber.scala | 108 ------------------ .../scala/akka/stream/impl/io/IOSinks.scala | 39 ------- .../scala/akka/stream/scaladsl/FileIO.scala | 4 +- 7 files changed, 148 insertions(+), 186 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/impl/io/FileOutputStage.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 37eb649869..2688245243 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -306,9 +306,9 @@ and then it will behave as in Akka 2.5.x: akka.coordinated-shutdown.run-by-actor-system-terminate = off ``` -### IOSources +### IOSources & FileIO -`StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required +`FileIO.toPath` and `StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException]. diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 6b4e475fe8..8ac6f830b2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -7,26 +7,26 @@ package akka.stream.io import java.nio.file.StandardOpenOption.{ CREATE, WRITE } import java.nio.file._ -import akka.actor.ActorSystem -import akka.dispatch.{ Dispatchers, ExecutionContexts } -import akka.stream.impl.PhasedFusingActorMaterializer -import akka.stream.impl.StreamSupervisor -import akka.stream.impl.StreamSupervisor.Children -import akka.stream.scaladsl.{ FileIO, Sink, Source } -import akka.stream.testkit._ -import akka.stream.testkit.Utils._ -import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.dispatch.ExecutionContexts import akka.stream._ +import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } +import akka.stream.impl.StreamSupervisor.Children +import akka.stream.scaladsl.{ FileIO, Keep, Sink, Source } +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.util.ByteString +import com.github.ghik.silencer.silent import com.google.common.jimfs.{ Configuration, Jimfs } +import org.scalatest.concurrent.ScalaFutures import scala.collection.mutable.ListBuffer -import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import com.github.ghik.silencer.silent +import scala.concurrent.{ Await, Future } +import scala.util.Success @silent -class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { +class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures { val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") implicit val materializer = ActorMaterializer(settings) @@ -159,40 +159,40 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { targetFile { f => - val sys = ActorSystem("FileSinkSpec-dispatcher-testing-1", UnboundedMailboxConfig) - val materializer = ActorMaterializer()(sys) + val forever = Source.maybe.toMat(FileIO.toPath(f))(Keep.left).run() try { - Source.fromIterator(() => Iterator.continually(TestByteStrings.head)).runWith(FileIO.toPath(f))(materializer) - materializer .asInstanceOf[PhasedFusingActorMaterializer] .supervisor .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSink").get - // haven't figured out why this returns the aliased id rather than the id, but the stage is going away so whatever - assertDispatcher(ref, Dispatchers.DefaultBlockingDispatcherId) - } finally shutdown(sys) + val children = expectMsgType[Children] + val ref = withClue(children) { + val fileSink = children.children.find(_.path.toString contains "fileSink") + fileSink shouldBe defined + fileSink.get + } + assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) + } finally { + forever.complete(Success(None)) + } } } "allow overriding the dispatcher using Attributes" in assertAllStagesStopped { targetFile { f => - val sys = ActorSystem("FileSinkSpec-dispatcher-testing-2", UnboundedMailboxConfig) - val materializer = ActorMaterializer()(sys) - + val forever = Source.maybe + .toMat(FileIO.toPath(f).addAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher")))(Keep.left) + .run() try { - Source - .fromIterator(() => Iterator.continually(TestByteStrings.head)) - .to(FileIO.toPath(f).addAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher"))) - .run()(materializer) - materializer .asInstanceOf[PhasedFusingActorMaterializer] .supervisor .tell(StreamSupervisor.GetChildren, testActor) val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSink").get assertDispatcher(ref, "akka.actor.default-dispatcher") - } finally shutdown(sys) + } finally { + forever.complete(Success(None)) + } } } @@ -209,22 +209,23 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { }(ExecutionContexts.sameThreadExecutionContext))) Await.result(completion, 3.seconds) - checkFileContents(f, TestLines.head) } } "complete materialized future with an exception when upstream fails" in assertAllStagesStopped { + val te = TE("oh no") targetFile { f => val completion = Source(TestByteStrings) .map { bytes => - if (bytes.contains('b')) throw new Error("bees!") + if (bytes.contains('b')) throw te bytes } .runWith(FileIO.toPath(f)) - val ex = intercept[AbruptIOTerminationException] { Await.result(completion, 3.seconds) } - ex.ioResult.count should equal(1001) + val ex = intercept[IOOperationIncompleteException] { Await.result(completion, 3.seconds) } + ex.count should equal(1001) + ex.getCause should equal(te) checkFileContents(f, TestLines.takeWhile(!_.contains('b')).mkString("")) } } @@ -233,7 +234,7 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { val completion = Source.single(ByteString("42")).runWith(FileIO.toPath(fs.getPath("/I/hope/this/file/doesnt/exist.txt"))) - completion.failed.futureValue shouldBe an[NoSuchFileException] + completion.failed.futureValue.getCause shouldBe an[NoSuchFileException] } } diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index 3b83b4a85e..f153eaf41e 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -108,6 +108,9 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPubl ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSinkActor$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSink") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.ActorRefSinkActor") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSink") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.FileSubscriber") # #25045 adding Java/Scala interop to SourceQueue and SinkQueue diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileOutputStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileOutputStage.scala new file mode 100644 index 0000000000..c28ea99888 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileOutputStage.scala @@ -0,0 +1,107 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.impl.io + +import java.nio.channels.FileChannel +import java.nio.file.{ OpenOption, Path } + +import scala.collection.immutable +import akka.annotation.InternalApi +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.{ + AbruptStageTerminationException, + Attributes, + IOOperationIncompleteException, + IOResult, + Inlet, + SinkShape +} +import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler } +import akka.util.ByteString +import akka.util.ccompat.JavaConverters._ + +import scala.concurrent.{ Future, Promise } +import scala.util.Success +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class FileOutputStage(path: Path, startPosition: Long, openOptions: immutable.Set[OpenOption]) + extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[IOResult]] { + + val in: Inlet[ByteString] = Inlet("FileSink") + override def shape: SinkShape[ByteString] = SinkShape(in) + override def initialAttributes: Attributes = DefaultAttributes.fileSink + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = { + val mat = Promise[IOResult] + val logic = new GraphStageLogic(shape) with InHandler { + private var chan: FileChannel = _ + private var bytesWritten: Long = 0 + + override def preStart(): Unit = { + try { + chan = FileChannel.open(path, openOptions.asJava) + if (startPosition > 0) { + chan.position(startPosition) + } + pull(in) + } catch { + case NonFatal(t) => + closeFile(Some(new IOOperationIncompleteException(bytesWritten, t))) + failStage(t) + } + } + + override def onPush(): Unit = { + val next = grab(in) + try { + bytesWritten += chan.write(next.asByteBuffer) + pull(in) + } catch { + case NonFatal(t) => + closeFile(Some(new IOOperationIncompleteException(bytesWritten, t))) + failStage(t) + } + } + + override def onUpstreamFailure(t: Throwable): Unit = { + closeFile(Some(new IOOperationIncompleteException(bytesWritten, t))) + failStage(t) + } + + override def onUpstreamFinish(): Unit = { + closeFile(None) + completeStage() + } + + override def postStop(): Unit = { + if (!mat.isCompleted) { + val failure = new AbruptStageTerminationException(this) + closeFile(Some(failure)) + mat.tryFailure(failure) + } + } + + private def closeFile(failed: Option[Throwable]): Unit = { + try { + if (chan ne null) chan.close() + failed match { + case Some(t) => mat.tryFailure(t) + case None => mat.tryComplete(Success(IOResult(bytesWritten))) + } + } catch { + case NonFatal(t) => + mat.tryFailure(failed.getOrElse(t)) + } + } + + setHandler(in, this) + } + (logic, mat.future) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala deleted file mode 100644 index bbca647c03..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package akka.stream.impl.io - -import java.nio.channels.FileChannel -import java.nio.file.{ OpenOption, Path } - -import akka.Done -import akka.actor.{ ActorLogging, Deploy, Props } -import akka.annotation.InternalApi -import akka.stream.{ AbruptIOTerminationException, IOResult } -import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } -import akka.util.ByteString -import com.github.ghik.silencer.silent - -import akka.util.ccompat.JavaConverters._ -import scala.concurrent.Promise -import scala.util.{ Failure, Success, Try } - -/** INTERNAL API */ -@InternalApi private[akka] object FileSubscriber { - def props( - f: Path, - completionPromise: Promise[IOResult], - bufSize: Int, - startPosition: Long, - openOptions: Set[OpenOption]) = { - require(bufSize > 0, "buffer size must be > 0") - require(startPosition >= 0, s"startPosition must be >= 0 (was $startPosition)") - Props(classOf[FileSubscriber], f, completionPromise, bufSize, startPosition, openOptions).withDeploy(Deploy.local) - } -} - -/** INTERNAL API */ -@silent -@InternalApi private[akka] class FileSubscriber( - f: Path, - completionPromise: Promise[IOResult], - bufSize: Int, - startPosition: Long, - openOptions: Set[OpenOption]) - extends akka.stream.actor.ActorSubscriber - with ActorLogging { - - override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize) - - private var chan: FileChannel = _ - - private var bytesWritten: Long = 0 - - override def preStart(): Unit = - try { - chan = FileChannel.open(f, openOptions.asJava) - if (startPosition > 0) { - chan.position(startPosition) - } - - super.preStart() - } catch { - case ex: Exception => - closeAndComplete(Failure(ex)) - cancel() - } - - def receive = { - case ActorSubscriberMessage.OnNext(bytes: ByteString) => - try { - bytesWritten += chan.write(bytes.asByteBuffer) - } catch { - case ex: Exception => - closeAndComplete(Success(IOResult(bytesWritten, Failure(ex)))) - cancel() - } - - case ActorSubscriberMessage.OnError(ex) => - log.error(ex, "Tearing down FileSink({}) due to upstream error", f) - closeAndComplete(Failure(AbruptIOTerminationException(IOResult(bytesWritten, Success(Done)), ex))) - context.stop(self) - - case ActorSubscriberMessage.OnComplete => context.stop(self) - } - - override def postStop(): Unit = { - closeAndComplete(Success(IOResult(bytesWritten, Success(Done)))) - super.postStop() - } - - private def closeAndComplete(result: Try[IOResult]): Unit = { - try { - // close the channel/file before completing the promise, allowing the - // file to be deleted, which would not work (on some systems) if the - // file is still open for writing - if (chan ne null) chan.close() - completionPromise.tryComplete(result) - } catch { - case closingException: Exception => - result match { - case Success(ioResult) => - val statusWithClosingException = - ioResult.status.transform(_ => Failure(closingException), ex => Failure(closingException.initCause(ex))) - completionPromise.trySuccess(ioResult.copy(status = statusWithClosingException)) - case Failure(ex) => completionPromise.tryFailure(closingException.initCause(ex)) - } - } - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index ddbda46dd5..74d6a0b21b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -5,53 +5,14 @@ package akka.stream.impl.io import java.io.OutputStream -import java.nio.file.{ OpenOption, Path } - import akka.annotation.InternalApi import akka.stream.ActorAttributes.Dispatcher import akka.stream._ import akka.stream.impl.SinkModule import akka.util.ByteString -import scala.collection.immutable import scala.concurrent.{ Future, Promise } -/** - * INTERNAL API - * Creates simple synchronous Sink which writes all incoming elements to the given file - * (creating it before hand if necessary). - */ -@InternalApi private[akka] final class FileSink( - f: Path, - startPosition: Long, - options: immutable.Set[OpenOption], - val attributes: Attributes, - shape: SinkShape[ByteString]) - extends SinkModule[ByteString, Future[IOResult]](shape) { - - override protected def label: String = s"FileSink($f, $options)" - - override def create(context: MaterializationContext) = { - val materializer = ActorMaterializerHelper.downcast(context.materializer) - - val maxInputBufferSize = context.effectiveAttributes.mandatoryAttribute[Attributes.InputBuffer].max - - val ioResultPromise = Promise[IOResult]() - val props = FileSubscriber.props(f, ioResultPromise, maxInputBufferSize, startPosition, options) - val ref = materializer.actorOf( - context, - props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[Dispatcher].dispatcher)) - - (akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future) - } - - override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] = - new FileSink(f, startPosition, options, attributes, shape) - - override def withAttributes(attr: Attributes): SinkModule[ByteString, Future[IOResult]] = - new FileSink(f, startPosition, options, attr, amendShape(attr)) -} - /** * INTERNAL API * Creates simple synchronous Sink which writes all incoming elements to the output stream. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index b9d2f7a6f9..d6d312e0bb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -20,8 +20,6 @@ import scala.concurrent.Future */ object FileIO { - import Sink.{ shape => sinkShape } - /** * Creates a Source from a files contents. * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, @@ -139,5 +137,5 @@ object FileIO { * @param startPosition the start position to write to */ def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] = - Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink"))) + Sink.fromGraph(new FileOutputStage(f, startPosition, options)) }