diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 7ac2848aee..2d0d8e9793 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -14,7 +14,7 @@ import akka.stream.impl.StreamSupervisor.Children import akka.stream.scaladsl.{ FileIO, Sink, Source } import akka.stream.testkit._ import akka.stream.testkit.Utils._ -import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, IOResult } +import akka.stream._ import akka.util.{ ByteString, Timeout } import com.google.common.jimfs.{ Configuration, Jimfs } import org.scalatest.BeforeAndAfterAll @@ -204,6 +204,20 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { } } + "complete materialized future with an exception when upstream fails" in assertAllStagesStopped { + targetFile { f ⇒ + val completion = Source(TestByteStrings) + .map { bytes ⇒ + if (bytes.contains('b')) throw new Error("bees!") + bytes + } + .runWith(FileIO.toPath(f)) + + val ex = intercept[AbruptIOTerminationException] { Await.result(completion, 3.seconds) } + ex.ioResult.count should equal(1001) + checkFileContents(f, TestLines.takeWhile(!_.contains('b')).mkString("")) + } + } } private def targetFile(block: Path ⇒ Unit, create: Boolean = true) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala index 41a0112cf9..3546a01467 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSinkSpec.scala @@ -8,7 +8,7 @@ import java.io.OutputStream import akka.stream.scaladsl.{ Source, StreamConverters } import akka.stream.testkit._ import akka.stream.testkit.Utils._ -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.{ AbruptIOTerminationException, ActorMaterializer, ActorMaterializerSettings } import akka.testkit.TestProbe import akka.util.ByteString @@ -39,7 +39,7 @@ class OutputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) { "close underlying stream when error received" in assertAllStagesStopped { val p = TestProbe() - Source.failed(new TE("Boom!")) + Source.failed(TE("Boom!")) .runWith(StreamConverters.fromOutputStream(() ⇒ new OutputStream { override def write(i: Int): Unit = () override def close() = p.ref ! "closed" @@ -48,6 +48,16 @@ class OutputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) { p.expectMsg("closed") } + "complete materialized value with the error" in assertAllStagesStopped { + val completion = Source.failed(TE("Boom!")) + .runWith(StreamConverters.fromOutputStream(() ⇒ new OutputStream { + override def write(i: Int): Unit = () + override def close() = () + })) + + completion.failed.futureValue shouldBe an[AbruptIOTerminationException] + } + "close underlying stream when completion received" in assertAllStagesStopped { val p = TestProbe() Source.empty diff --git a/akka-stream/src/main/scala/akka/stream/IOResult.scala b/akka-stream/src/main/scala/akka/stream/IOResult.scala index 47c214c61e..785e0e877d 100644 --- a/akka-stream/src/main/scala/akka/stream/IOResult.scala +++ b/akka-stream/src/main/scala/akka/stream/IOResult.scala @@ -4,6 +4,8 @@ package akka.stream import akka.Done + +import scala.util.control.NoStackTrace import scala.util.{ Failure, Success, Try } /** @@ -48,3 +50,10 @@ object IOResult { def createFailed(count: Long, ex: Throwable): IOResult = new IOResult(count, Failure(ex)) } + +/** + * This exception signals that a stream has been completed by an onError signal + * while there was still IO operations in progress. + */ +final case class AbruptIOTerminationException(ioResult: IOResult, cause: Throwable) + extends RuntimeException("Stream terminated without completing IO operation.", cause) with NoStackTrace diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 778b8f2656..c65eadd3b2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -4,18 +4,18 @@ package akka.stream.impl.io import java.nio.channels.FileChannel -import java.nio.file.{ Path, OpenOption } +import java.nio.file.{ OpenOption, Path } import akka.Done import akka.actor.{ ActorLogging, Deploy, Props } import akka.annotation.InternalApi -import akka.stream.IOResult +import akka.stream.{ AbruptIOTerminationException, IOResult } import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.util.ByteString import scala.collection.JavaConverters._ import scala.concurrent.Promise -import scala.util.{ Failure, Success } +import scala.util.{ Failure, Success, Try } /** INTERNAL API */ @InternalApi private[akka] object FileSubscriber { @@ -46,7 +46,7 @@ import scala.util.{ Failure, Success } super.preStart() } catch { case ex: Exception ⇒ - closeAndComplete(IOResult(bytesWritten, Failure(ex))) + closeAndComplete(Success(IOResult(bytesWritten, Failure(ex)))) cancel() } @@ -56,33 +56,37 @@ import scala.util.{ Failure, Success } bytesWritten += chan.write(bytes.asByteBuffer) } catch { case ex: Exception ⇒ - closeAndComplete(IOResult(bytesWritten, Failure(ex))) + closeAndComplete(Success(IOResult(bytesWritten, Failure(ex)))) cancel() } case ActorSubscriberMessage.OnError(ex) ⇒ log.error(ex, "Tearing down FileSink({}) due to upstream error", f) - closeAndComplete(IOResult(bytesWritten, Failure(ex))) + closeAndComplete(Failure(AbruptIOTerminationException(IOResult(bytesWritten, Success(Done)), ex))) context.stop(self) case ActorSubscriberMessage.OnComplete ⇒ context.stop(self) } override def postStop(): Unit = { - closeAndComplete(IOResult(bytesWritten, Success(Done))) + closeAndComplete(Success(IOResult(bytesWritten, Success(Done)))) super.postStop() } - private def closeAndComplete(result: IOResult): Unit = { + private def closeAndComplete(result: Try[IOResult]): Unit = { try { // close the channel/file before completing the promise, allowing the // file to be deleted, which would not work (on some systems) if the // file is still open for writing if (chan ne null) chan.close() - completionPromise.trySuccess(result) + completionPromise.tryComplete(result) } catch { - case ex: Exception ⇒ - completionPromise.trySuccess(IOResult(bytesWritten, Failure(ex))) + case closingException: Exception ⇒ result match { + case Success(ioResult) ⇒ + val statusWithClosingException = ioResult.status.transform(d ⇒ Failure(closingException), ex ⇒ Failure(closingException.initCause(ex))) + completionPromise.trySuccess(ioResult.copy(status = statusWithClosingException)) + case Failure(ex) ⇒ completionPromise.tryFailure(closingException.initCause(ex)) + } } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala index 3943aa963d..4b934a5934 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala @@ -9,7 +9,7 @@ import akka.Done import akka.actor.{ ActorLogging, Deploy, Props } import akka.annotation.InternalApi import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } -import akka.stream.IOResult +import akka.stream.{ AbruptIOTerminationException, IOResult } import akka.util.ByteString import scala.concurrent.Promise @@ -48,7 +48,7 @@ import scala.util.{ Failure, Success } case ActorSubscriberMessage.OnError(ex) ⇒ log.error(ex, "Tearing down OutputStreamSink due to upstream error, wrote bytes: {}", bytesWritten) - completionPromise.success(IOResult(bytesWritten, Failure(ex))) + completionPromise.failure(AbruptIOTerminationException(IOResult(bytesWritten, Success(Done)), ex)) context.stop(self) case ActorSubscriberMessage.OnComplete ⇒