From 200b07e534abf9c6c48107ff91b6b2a47679971d Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Tue, 2 Jul 2019 10:46:16 +0100 Subject: [PATCH] Re-write InputStreamSource as GraphStage (#26811) * Re-write InputStreamPublisher as a GraphStage * Deprecate IOResult.failure and introduce IOOperationIncompleteException After some dicussion with @johanandren, @raboof and @2m about the confusion of the materialised value of the IO streams to complete even if there is an exception (with the exception in the IOResult) this now changes it to: * Deprecate failure in IOResult so it is always set to Success(Done) * Stop using AbrubtIOTerminationException as the inner IOResult also contains an exception causing confusion --- .../project/migration-guide-2.5.x-2.6.x.md | 9 +- .../scala/akka/stream/io/FileSinkSpec.scala | 2 + .../scala/akka/stream/io/FileSourceSpec.scala | 2 + .../stream/io/InputStreamSourceSpec.scala | 114 +++++++++++++----- .../akka/stream/io/OutputStreamSinkSpec.scala | 2 + .../mima-filters/2.5.x.backwards.excludes | 17 +++ .../src/main/scala/akka/stream/IOResult.scala | 22 +++- .../scala/akka/stream/impl/io/IOSources.scala | 46 ------- .../stream/impl/io/InputStreamPublisher.scala | 94 --------------- .../stream/impl/io/InputStreamSource.scala | 114 ++++++++++++++++++ .../stream/scaladsl/StreamConverters.scala | 7 +- 11 files changed, 252 insertions(+), 177 deletions(-) delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 29af48c377..3193b2e745 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -266,7 +266,7 @@ blocking and protect a bit against starving the internal actors. Since the inter the default dispatcher has been adjusted down to `1.0` which means the number of threads will be one per core, but at least `8` and at most `64`. This can be tuned using the individual settings in `akka.actor.default-dispatcher.fork-join-executor`. -### Cluster sharding +### Cluster Sharding #### waiting-for-state-timeout reduced to 2s @@ -274,7 +274,7 @@ This has been reduced to speed up ShardCoordinator initialization in smaller clu The read from ddata is a ReadMajority, for small clusters (< majority-min-cap) every node needs to respond so is more likely to timeout if there are nodes restarting e.g. when there is a rolling re-deploy happening. -### Passivate idle entity +#### Passivate idle entity The configuration `akka.cluster.sharding.passivate-idle-entity-after` is now enabled by default. Sharding will passivate entities when they have not received any messages after this duration. @@ -301,6 +301,11 @@ and then it will behave as in Akka 2.5.x: akka.coordinated-shutdown.run-by-actor-system-terminate = off ``` +### IOSources + +`StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required +to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure +the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException]. ### Akka now uses Fork Join Pool from JDK diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 8025321b08..6b4e475fe8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -23,7 +23,9 @@ import com.google.common.jimfs.{ Configuration, Jimfs } import scala.collection.mutable.ListBuffer import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ +import com.github.ghik.silencer.silent +@silent class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index 42fec4feee..cf7b39aaee 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -23,11 +23,13 @@ import akka.util.ByteString import com.google.common.jimfs.{ Configuration, Jimfs } import scala.concurrent.duration._ +import com.github.ghik.silencer.silent object FileSourceSpec { final case class Settings(chunkSize: Int, readAhead: Int) } +@silent class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala index 9ebaa841f0..c6d579a7f4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSourceSpec.scala @@ -4,56 +4,110 @@ package akka.stream.io -import java.io.InputStream +import java.io.{ ByteArrayInputStream, InputStream } import java.util.concurrent.CountDownLatch -import akka.stream.scaladsl.{ Sink, StreamConverters } +import akka.Done +import akka.stream.scaladsl.{ Keep, Sink, StreamConverters } import akka.stream.testkit._ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.{ AbruptStageTerminationException, ActorMaterializer, ActorMaterializerSettings, IOResult } import akka.util.ByteString -import scala.concurrent.Await -import scala.concurrent.duration._ +import scala.util.Success +import com.github.ghik.silencer.silent + +@silent class InputStreamSourceSpec extends StreamSpec(UnboundedMailboxConfig) { val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") implicit val materializer = ActorMaterializer(settings) - "InputStreamSource" must { + private def inputStreamFor(bytes: Array[Byte]): InputStream = + new ByteArrayInputStream(bytes) - "not signal when no demand" in { - val f = StreamConverters.fromInputStream(() => - new InputStream { - override def read(): Int = 42 - }) - - Await.result(f.takeWithin(5.seconds).runForeach(_ => ()), 10.seconds) - } + "InputStream Source" must { "read bytes from InputStream" in assertAllStagesStopped { - val f = StreamConverters - .fromInputStream(() => - new InputStream { - @volatile var buf = List("a", "b", "c").map(_.charAt(0).toInt) - override def read(): Int = { - buf match { - case head :: tail => - buf = tail - head - case Nil => - -1 - } - - } - }) - .runWith(Sink.head) + val f = + StreamConverters.fromInputStream(() => inputStreamFor(Array('a', 'b', 'c').map(_.toByte))).runWith(Sink.head) f.futureValue should ===(ByteString("abc")) } + "record number of bytes read" in assertAllStagesStopped { + StreamConverters + .fromInputStream(() => inputStreamFor(Array(1, 2, 3))) + .toMat(Sink.ignore)(Keep.left) + .run + .futureValue shouldEqual IOResult(3, Success(Done)) + } + + "return fail if close fails" in assertAllStagesStopped { + val fail = new RuntimeException("oh dear") + StreamConverters + .fromInputStream(() => + new InputStream { + override def read(): Int = -1 + override def close(): Unit = throw fail + }) + .toMat(Sink.ignore)(Keep.left) + .run + .failed + .futureValue + .getCause shouldEqual fail + } + + "return failure if creation fails" in { + val fail = new RuntimeException("oh dear indeed") + StreamConverters + .fromInputStream(() => { + throw fail + }) + .toMat(Sink.ignore)(Keep.left) + .run + .failed + .futureValue + .getCause shouldEqual fail + } + + "handle failure on read" in assertAllStagesStopped { + val fail = new RuntimeException("oh dear indeed") + StreamConverters + .fromInputStream(() => () => throw fail) + .toMat(Sink.ignore)(Keep.left) + .run + .failed + .futureValue + .getCause shouldEqual fail + } + + "include number of bytes when downstream doesn't read all of it" in { + val f = StreamConverters + .fromInputStream(() => inputStreamFor(Array.fill(100)(1)), 1) + .take(1) // stream is not completely read + .toMat(Sink.ignore)(Keep.left) + .run + .futureValue + + f.status shouldEqual Success(Done) + f.count shouldBe >=(1L) + } + + "handle actor materializer shutdown" in { + val mat = ActorMaterializer() + val source = StreamConverters.fromInputStream(() => inputStreamFor(Array(1, 2, 3))) + val pubSink = Sink.asPublisher[ByteString](false) + val (f, neverPub) = source.toMat(pubSink)(Keep.both).run()(mat) + val c = TestSubscriber.manualProbe[ByteString]() + neverPub.subscribe(c) + c.expectSubscription() + mat.shutdown() + f.failed.futureValue shouldBe an[AbruptStageTerminationException] + } + "emit as soon as read" in assertAllStagesStopped { val latch = new CountDownLatch(1) val probe = StreamConverters diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala index 10d29fca50..967186ac88 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala @@ -16,7 +16,9 @@ import akka.util.ByteString import scala.concurrent.Await import scala.concurrent.duration._ +import com.github.ghik.silencer.silent +@silent class OutputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) { val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index 0aaaafadf3..722753f54e 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -97,6 +97,14 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPubl ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithUnrestrictedStash") ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorSubscriber$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithStash") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.actor.AbstractActorPublisherWithStash") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.InputStreamSource.withAttributes") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.InputStreamSource.*") +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.io.InputStreamSource") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamSource") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$Continue$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$") # #25045 adding Java/Scala interop to SourceQueue and SinkQueue ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter") @@ -112,3 +120,12 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Restart # Adding methods to Materializer is not compatible but we don't support other Materializer implementations ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.scheduleAtFixedRate") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.scheduleWithFixedDelay") + +#26187 - Re-write ActorPublisher/Subscribers as GraphStages +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.io.InputStreamSource.withAttributes") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.InputStreamSource.*") +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.impl.io.InputStreamSource") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamSource") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$Continue$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$") diff --git a/akka-stream/src/main/scala/akka/stream/IOResult.scala b/akka-stream/src/main/scala/akka/stream/IOResult.scala index d57fd03c11..41b7528bed 100644 --- a/akka-stream/src/main/scala/akka/stream/IOResult.scala +++ b/akka-stream/src/main/scala/akka/stream/IOResult.scala @@ -5,6 +5,7 @@ package akka.stream import akka.Done +import com.github.ghik.silencer.silent import scala.util.control.NoStackTrace import scala.util.{ Failure, Success, Try } @@ -15,9 +16,14 @@ import scala.util.{ Failure, Success, Try } * @param count Numeric value depending on context, for example IO operations performed or bytes processed. * @param status Status of the result. Can be either [[akka.Done]] or an exception. */ -final case class IOResult(count: Long, status: Try[Done]) { +@silent // deprecated success +final case class IOResult( + count: Long, + @deprecated("status is always set to Success(Done)", "2.6.0") status: Try[Done]) { def withCount(value: Long): IOResult = copy(count = value) + + @deprecated("status is always set to Success(Done)", "2.6.0") def withStatus(value: Try[Done]): IOResult = copy(status = value) /** @@ -28,12 +34,14 @@ final case class IOResult(count: Long, status: Try[Done]) { /** * Java API: Indicates whether IO operation completed successfully or not. */ + @deprecated("status is always set to Success(Done)", "2.6.0") def wasSuccessful: Boolean = status.isSuccess /** * Java API: If the IO operation resulted in an error, returns the corresponding [[Throwable]] * or throws [[UnsupportedOperationException]] otherwise. */ + @deprecated("status is always set to Success(Done)", "2.6.0") def getError: Throwable = status match { case Failure(t) => t case Success(_) => throw new UnsupportedOperationException("IO operation was successful.") @@ -43,6 +51,8 @@ final case class IOResult(count: Long, status: Try[Done]) { object IOResult { + def apply(count: Long): IOResult = IOResult(count, Success(Done)) + /** JAVA API: Creates successful IOResult */ def createSuccessful(count: Long): IOResult = new IOResult(count, Success(Done)) @@ -56,6 +66,16 @@ object IOResult { * This exception signals that a stream has been completed by an onError signal * while there was still IO operations in progress. */ +@deprecated("use IOOperationIncompleteException", "2.6.0") final case class AbruptIOTerminationException(ioResult: IOResult, cause: Throwable) extends RuntimeException("Stream terminated without completing IO operation.", cause) with NoStackTrace + +/** + * This exception signals that a stream has been completed or has an error while + * there was still IO operations in progress + * + * @param count The number of bytes read/written up until the error + * @param cause cause + */ +final class IOOperationIncompleteException(val count: Long, cause: Throwable) extends RuntimeException(cause) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index b9701dbd0a..582697a5c0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -4,21 +4,15 @@ package akka.stream.impl.io -import java.io.InputStream import java.nio.ByteBuffer import java.nio.channels.{ CompletionHandler, FileChannel } import java.nio.file.{ Files, NoSuchFileException, Path, StandardOpenOption } import akka.Done -import akka.annotation.InternalApi -import akka.stream.ActorAttributes.Dispatcher import akka.stream.Attributes.InputBuffer -import akka.stream.impl.{ ErrorPublisher, SourceModule } import akka.stream.stage._ import akka.stream.{ IOResult, _ } import akka.util.ByteString -import com.github.ghik.silencer.silent -import org.reactivestreams.Publisher import scala.annotation.tailrec import scala.concurrent.{ Future, Promise } @@ -140,43 +134,3 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition: override def toString = s"FileSource($path, $chunkSize)" } - -/** - * INTERNAL API - * Source backed by the given input stream. - */ -@InternalApi private[akka] final class InputStreamSource( - createInputStream: () => InputStream, - chunkSize: Int, - val attributes: Attributes, - shape: SourceShape[ByteString]) - extends SourceModule[ByteString, Future[IOResult]](shape) { - override def create(context: MaterializationContext) = { - val materializer = ActorMaterializerHelper.downcast(context.materializer) - val ioResultPromise = Promise[IOResult]() - - @silent - val pub = try { - val is = createInputStream() // can throw, i.e. FileNotFound - - val props = InputStreamPublisher - .props(is, ioResultPromise, chunkSize) - .withDispatcher(context.effectiveAttributes.mandatoryAttribute[Dispatcher].dispatcher) - - val ref = materializer.actorOf(context, props) - akka.stream.actor.ActorPublisher[ByteString](ref) - } catch { - case ex: Exception => - ioResultPromise.failure(ex) - ErrorPublisher(ex, attributes.nameOrDefault("inputStreamSource")).asInstanceOf[Publisher[ByteString]] - } - - (pub, ioResultPromise.future) - } - - override protected def newInstance(shape: SourceShape[ByteString]): SourceModule[ByteString, Future[IOResult]] = - new InputStreamSource(createInputStream, chunkSize, attributes, shape) - - override def withAttributes(attr: Attributes): SourceModule[ByteString, Future[IOResult]] = - new InputStreamSource(createInputStream, chunkSize, attr, amendShape(attr)) -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala deleted file mode 100644 index e31e9d5132..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (C) 2015-2019 Lightbend Inc. - */ - -package akka.stream.impl.io - -import java.io.InputStream - -import akka.Done -import akka.actor.{ ActorLogging, DeadLetterSuppression, Deploy, Props } -import akka.annotation.InternalApi -import akka.stream.actor.ActorPublisherMessage -import akka.stream.IOResult -import akka.util.ByteString -import com.github.ghik.silencer.silent - -import scala.concurrent.Promise -import scala.util.{ Failure, Success } - -/** INTERNAL API */ -@InternalApi private[akka] object InputStreamPublisher { - - def props(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int): Props = { - require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") - - Props(classOf[InputStreamPublisher], is, completionPromise, chunkSize).withDeploy(Deploy.local) - } - - private final case object Continue extends DeadLetterSuppression -} - -/** INTERNAL API */ -@silent -@InternalApi private[akka] class InputStreamPublisher( - is: InputStream, - completionPromise: Promise[IOResult], - chunkSize: Int) - extends akka.stream.actor.ActorPublisher[ByteString] - with ActorLogging { - - // TODO possibly de-duplicate with FilePublisher? - - import InputStreamPublisher._ - - val arr = new Array[Byte](chunkSize) - var readBytesTotal = 0L - - def receive = { - case ActorPublisherMessage.Request(_) => readAndSignal() - case Continue => readAndSignal() - case ActorPublisherMessage.Cancel => context.stop(self) - } - - def readAndSignal(): Unit = - if (isActive) { - readAndEmit() - if (totalDemand > 0 && isActive) self ! Continue - } - - def readAndEmit(): Unit = - if (totalDemand > 0) try { - // blocking read - val readBytes = is.read(arr) - - readBytes match { - case -1 => - // had nothing to read into this chunk - log.debug("No more bytes available to read (got `-1` from `read`)") - onCompleteThenStop() - - case _ => - readBytesTotal += readBytes - - // emit immediately, as this is the only chance to do it before we might block again - onNext(ByteString.fromArray(arr, 0, readBytes)) - } - } catch { - case ex: Exception => - onErrorThenStop(ex) - } - - override def postStop(): Unit = { - super.postStop() - - try { - if (is ne null) is.close() - } catch { - case ex: Exception => - completionPromise.success(IOResult(readBytesTotal, Failure(ex))) - } - - completionPromise.trySuccess(IOResult(readBytesTotal, Success(Done))) - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala new file mode 100644 index 0000000000..8dc46f08dd --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.impl.io + +import java.io.InputStream + +import akka.annotation.InternalApi +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.{ + AbruptStageTerminationException, + Attributes, + IOOperationIncompleteException, + IOResult, + Outlet, + SourceShape +} +import akka.stream.stage.{ GraphStageLogic, GraphStageLogicWithLogging, GraphStageWithMaterializedValue, OutHandler } +import akka.util.ByteString + +import scala.concurrent.{ Future, Promise } +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class InputStreamSource(factory: () => InputStream, chunkSize: Int) + extends GraphStageWithMaterializedValue[SourceShape[ByteString], Future[IOResult]] { + + require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") + + private val out: Outlet[ByteString] = Outlet("InputStreamSource.out") + + override def shape: SourceShape[ByteString] = SourceShape(out) + + override protected def initialAttributes: Attributes = DefaultAttributes.inputStreamSource + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = { + val mat = Promise[IOResult] + val logic = new GraphStageLogicWithLogging(shape) with OutHandler { + private val buffer = new Array[Byte](chunkSize) + private var readBytesTotal = 0L + private var inputStream: InputStream = _ + private def isClosed = mat.isCompleted + + override def preStart(): Unit = { + try { + inputStream = factory() + } catch { + case NonFatal(t) => + mat.failure(new IOOperationIncompleteException(0, t)) + failStage(t) + } + } + + override def onPull(): Unit = + try { + inputStream.read(buffer) match { + case -1 => + closeStage() + case readBytes => + readBytesTotal += readBytes + push(out, ByteString.fromArray(buffer, 0, readBytes)) + } + } catch { + case NonFatal(t) => + failStream(t) + failStage(t) + } + + override def onDownstreamFinish(): Unit = { + if (!isClosed) { + closeInputStream() + mat.trySuccess(IOResult(readBytesTotal)) + } + } + + override def postStop(): Unit = { + if (!isClosed) { + mat.tryFailure(new AbruptStageTerminationException(this)) + } + } + + private def closeStage(): Unit = { + closeInputStream() + mat.trySuccess(IOResult(readBytesTotal)) + completeStage() + } + + private def failStream(reason: Throwable): Unit = { + closeInputStream() + mat.tryFailure(new IOOperationIncompleteException(readBytesTotal, reason)) + } + + private def closeInputStream(): Unit = { + try { + if (inputStream != null) + inputStream.close() + } catch { + case NonFatal(ex) => + mat.tryFailure(new IOOperationIncompleteException(readBytesTotal, ex)) + failStage(ex) + } + } + + setHandler(out, this) + } + (logic, mat.future) + + } + override def toString: String = "InputStreamSource" +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 2be2c185e7..a20ac7917f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -24,7 +24,6 @@ import akka.NotUsed */ object StreamConverters { - import Source.{ shape => sourceShape } import Sink.{ shape => sinkShape } /** @@ -45,9 +44,9 @@ object StreamConverters { * @param in a function which creates the InputStream to read from * @param chunkSize the size of each read operation, defaults to 8192 */ - def fromInputStream(in: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = - Source.fromGraph( - new InputStreamSource(in, chunkSize, DefaultAttributes.inputStreamSource, sourceShape("InputStreamSource"))) + def fromInputStream(in: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = { + Source.fromGraph(new InputStreamSource(in, chunkSize)) + } /** * Creates a Source which when materialized will return an [[OutputStream]] which it is possible