diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala index 9093c5d0a8..244e088e5b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import akka.NotUsed import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } import akka.stream._ -import akka.stream.testkit.Utils.assertAllStagesStopped +import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import scala.concurrent._ import scala.concurrent.duration._ @@ -93,6 +93,22 @@ class FlowFlattenMergeSpec extends StreamSpec { }.cause.get should ===(ex) } + "bubble up substream materialization exception" in assertAllStagesStopped { + val matFail = TE("fail!") + object FailingInnerMat extends GraphStage[SourceShape[String]] { + val out = Outlet[String]("out") + val shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + throw matFail + } + } + + val result = Source.single(()).flatMapMerge(4, _ ⇒ Source.fromGraph(FailingInnerMat)).runWith(Sink.ignore) + + result.failed.futureValue should ===(matFail) + + } + "cancel substreams when failing from main stream" in assertAllStagesStopped { val p1, p2 = TestPublisher.probe[Int]() val ex = new Exception("buh") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala index c69a1ffb2e..e3813f8e7d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala @@ -3,9 +3,10 @@ */ package akka.stream.scaladsl +import akka.stream.stage.{ GraphStage, GraphStageLogic } import akka.stream.testkit.StreamSpec import akka.stream.testkit.scaladsl.TestSink -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream._ import akka.stream.testkit.Utils._ import scala.util.control.NoStackTrace @@ -132,5 +133,23 @@ class FlowRecoverWithSpec extends StreamSpec { Flow[Int].recoverWithRetries(-2, { case t: Throwable ⇒ Source.empty[Int] }) } } + + "fail correctly when materialization of recover source fails" in assertAllStagesStopped { + val matFail = TE("fail!") + object FailingInnerMat extends GraphStage[SourceShape[String]] { + val out = Outlet[String]("out") + val shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + throw matFail + } + } + + val result = Source.failed(TE("trigger")).recoverWithRetries(1, { + case _: TE ⇒ Source.fromGraph(FailingInnerMat) + }).runWith(Sink.ignore) + + result.failed.futureValue should ===(matFail) + + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala index 0a351558a7..654ca53ca7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala @@ -5,15 +5,17 @@ package akka.stream.scaladsl import java.util.concurrent.TimeoutException +import akka.NotUsed import akka.stream.ActorAttributes.supervisionStrategy import akka.stream.Supervision._ import akka.stream._ +import akka.stream.stage.{ GraphStage, GraphStageLogic } import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink -import scala.concurrent.{ Promise, Future, Await } +import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ class LazySinkSpec extends StreamSpec { @@ -141,6 +143,25 @@ class LazySinkSpec extends StreamSpec { val futureProbe = Source.empty.runWith(Sink.lazyInit[Int, Future[Int]](_ ⇒ Future.successful(Sink.fold[Int, Int](0)(_ + _)), () ⇒ throw ex)) a[TE] shouldBe thrownBy { Await.result(futureProbe, 300.millis) } } + + "fail correctly when materialization of inner sink fails" in assertAllStagesStopped { + val matFail = TE("fail!") + object FailingInnerMat extends GraphStage[SinkShape[String]] { + val in = Inlet[String]("in") + val shape = SinkShape(in) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + throw matFail + } + } + + val result = Source.single("whatever") + .runWith( + Sink.lazyInit[String, NotUsed]( + str ⇒ Future.successful(Sink.fromGraph(FailingInnerMat)), + () ⇒ NotUsed)) + + result.failed.futureValue should ===(matFail) + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala index 14298abf27..c91d7d9fd0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -115,6 +115,22 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { attributes should contain(Attributes.Name("outer")) attributes.indexOf(Attributes.Name("inner")) < attributes.indexOf(Attributes.Name("outer")) should be(true) } + + "fail correctly when materialization of inner source fails" in assertAllStagesStopped { + val matFail = TE("fail!") + object FailingInnerMat extends GraphStage[SourceShape[String]] { + val out = Outlet[String]("out") + val shape = SourceShape(out) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + throw matFail + } + } + + val result = Source.lazily(() ⇒ Source.fromGraph(FailingInnerMat)).to(Sink.ignore).run() + + result.failed.futureValue should ===(matFail) + + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 0a43c76706..eaf71962d5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -552,7 +552,14 @@ import akka.util.OptionVal } switchToFirstElementHandlers() - promise.trySuccess(Source.fromGraph(sourceOut.source).runWith(sink)(interpreter.subFusingMaterializer)) + try { + val matVal = Source.fromGraph(sourceOut.source).runWith(sink)(interpreter.subFusingMaterializer) + promise.trySuccess(matVal) + } catch { + case NonFatal(ex) ⇒ + promise.tryFailure(ex) + failStage(ex) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index d833e0b8e0..95e0894ba0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -391,10 +391,15 @@ import scala.util.control.NonFatal } }) - matPromise.tryComplete( - Try { - subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes) - }) + try { + val matVal = subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes) + matPromise.trySuccess(matVal) + } catch { + case NonFatal(ex) ⇒ + subSink.cancel() + failStage(ex) + matPromise.tryFailure(ex) + } } setHandler(out, this)