#23951 Fail materialized future of IO stages when stream fails

This commit is contained in:
Martynas Mickevičius 2018-02-20 13:49:31 +02:00
parent 2b8b946bc7
commit 24fd986aca
No known key found for this signature in database
GPG key ID: E735DF276C508071
5 changed files with 53 additions and 16 deletions

View file

@ -14,7 +14,7 @@ import akka.stream.impl.StreamSupervisor.Children
import akka.stream.scaladsl.{ FileIO, Sink, Source }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, IOResult }
import akka.stream._
import akka.util.{ ByteString, Timeout }
import com.google.common.jimfs.{ Configuration, Jimfs }
import org.scalatest.BeforeAndAfterAll
@ -204,6 +204,20 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
}
}
"complete materialized future with an exception when upstream fails" in assertAllStagesStopped {
targetFile { f
val completion = Source(TestByteStrings)
.map { bytes
if (bytes.contains('b')) throw new Error("bees!")
bytes
}
.runWith(FileIO.toPath(f))
val ex = intercept[AbruptIOTerminationException] { Await.result(completion, 3.seconds) }
ex.ioResult.count should equal(1001)
checkFileContents(f, TestLines.takeWhile(!_.contains('b')).mkString(""))
}
}
}
private def targetFile(block: Path Unit, create: Boolean = true) {

View file

@ -8,7 +8,7 @@ import java.io.OutputStream
import akka.stream.scaladsl.{ Source, StreamConverters }
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.{ AbruptIOTerminationException, ActorMaterializer, ActorMaterializerSettings }
import akka.testkit.TestProbe
import akka.util.ByteString
@ -39,7 +39,7 @@ class OutputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
"close underlying stream when error received" in assertAllStagesStopped {
val p = TestProbe()
Source.failed(new TE("Boom!"))
Source.failed(TE("Boom!"))
.runWith(StreamConverters.fromOutputStream(() new OutputStream {
override def write(i: Int): Unit = ()
override def close() = p.ref ! "closed"
@ -48,6 +48,16 @@ class OutputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
p.expectMsg("closed")
}
"complete materialized value with the error" in assertAllStagesStopped {
val completion = Source.failed(TE("Boom!"))
.runWith(StreamConverters.fromOutputStream(() new OutputStream {
override def write(i: Int): Unit = ()
override def close() = ()
}))
completion.failed.futureValue shouldBe an[AbruptIOTerminationException]
}
"close underlying stream when completion received" in assertAllStagesStopped {
val p = TestProbe()
Source.empty

View file

@ -4,6 +4,8 @@
package akka.stream
import akka.Done
import scala.util.control.NoStackTrace
import scala.util.{ Failure, Success, Try }
/**
@ -48,3 +50,10 @@ object IOResult {
def createFailed(count: Long, ex: Throwable): IOResult =
new IOResult(count, Failure(ex))
}
/**
* This exception signals that a stream has been completed by an onError signal
* while there was still IO operations in progress.
*/
final case class AbruptIOTerminationException(ioResult: IOResult, cause: Throwable)
extends RuntimeException("Stream terminated without completing IO operation.", cause) with NoStackTrace

View file

@ -4,18 +4,18 @@
package akka.stream.impl.io
import java.nio.channels.FileChannel
import java.nio.file.{ Path, OpenOption }
import java.nio.file.{ OpenOption, Path }
import akka.Done
import akka.actor.{ ActorLogging, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.IOResult
import akka.stream.{ AbruptIOTerminationException, IOResult }
import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy }
import akka.util.ByteString
import scala.collection.JavaConverters._
import scala.concurrent.Promise
import scala.util.{ Failure, Success }
import scala.util.{ Failure, Success, Try }
/** INTERNAL API */
@InternalApi private[akka] object FileSubscriber {
@ -46,7 +46,7 @@ import scala.util.{ Failure, Success }
super.preStart()
} catch {
case ex: Exception
closeAndComplete(IOResult(bytesWritten, Failure(ex)))
closeAndComplete(Success(IOResult(bytesWritten, Failure(ex))))
cancel()
}
@ -56,33 +56,37 @@ import scala.util.{ Failure, Success }
bytesWritten += chan.write(bytes.asByteBuffer)
} catch {
case ex: Exception
closeAndComplete(IOResult(bytesWritten, Failure(ex)))
closeAndComplete(Success(IOResult(bytesWritten, Failure(ex))))
cancel()
}
case ActorSubscriberMessage.OnError(ex)
log.error(ex, "Tearing down FileSink({}) due to upstream error", f)
closeAndComplete(IOResult(bytesWritten, Failure(ex)))
closeAndComplete(Failure(AbruptIOTerminationException(IOResult(bytesWritten, Success(Done)), ex)))
context.stop(self)
case ActorSubscriberMessage.OnComplete context.stop(self)
}
override def postStop(): Unit = {
closeAndComplete(IOResult(bytesWritten, Success(Done)))
closeAndComplete(Success(IOResult(bytesWritten, Success(Done))))
super.postStop()
}
private def closeAndComplete(result: IOResult): Unit = {
private def closeAndComplete(result: Try[IOResult]): Unit = {
try {
// close the channel/file before completing the promise, allowing the
// file to be deleted, which would not work (on some systems) if the
// file is still open for writing
if (chan ne null) chan.close()
completionPromise.trySuccess(result)
completionPromise.tryComplete(result)
} catch {
case ex: Exception
completionPromise.trySuccess(IOResult(bytesWritten, Failure(ex)))
case closingException: Exception result match {
case Success(ioResult)
val statusWithClosingException = ioResult.status.transform(d Failure(closingException), ex Failure(closingException.initCause(ex)))
completionPromise.trySuccess(ioResult.copy(status = statusWithClosingException))
case Failure(ex) completionPromise.tryFailure(closingException.initCause(ex))
}
}
}
}

View file

@ -9,7 +9,7 @@ import akka.Done
import akka.actor.{ ActorLogging, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy }
import akka.stream.IOResult
import akka.stream.{ AbruptIOTerminationException, IOResult }
import akka.util.ByteString
import scala.concurrent.Promise
@ -48,7 +48,7 @@ import scala.util.{ Failure, Success }
case ActorSubscriberMessage.OnError(ex)
log.error(ex, "Tearing down OutputStreamSink due to upstream error, wrote bytes: {}", bytesWritten)
completionPromise.success(IOResult(bytesWritten, Failure(ex)))
completionPromise.failure(AbruptIOTerminationException(IOResult(bytesWritten, Success(Done)), ex))
context.stop(self)
case ActorSubscriberMessage.OnComplete