From 17c80b3be97b89230be222b62b3820a1974d17b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 18 Mar 2019 14:28:31 +0100 Subject: [PATCH] Race condition in lazy sink on immediate failure (#25413) * Race condition in lazy sink on immediate failure #25410 --- .../akka/stream/scaladsl/LazySinkSpec.scala | 26 +++++++++++++++---- .../main/scala/akka/stream/impl/Sinks.scala | 6 ++++- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala index 5c968cdddc..17772f2968 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala @@ -9,14 +9,15 @@ import java.util.concurrent.TimeoutException import akka.NotUsed import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic } -import akka.stream.testkit.{ StreamSpec, TestPublisher } import akka.stream.testkit.TestSubscriber.Probe import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.{ StreamSpec, TestPublisher } -import scala.concurrent.{ Await, Future, Promise } +import scala.collection.immutable import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future, Promise } class LazySinkSpec extends StreamSpec { @@ -79,7 +80,7 @@ class LazySinkSpec extends StreamSpec { a[RuntimeException] shouldBe thrownBy { Await.result(futureProbe, remainingOrDefault) } } - "failed gracefully when upstream failed" in assertAllStagesStopped { + "fail gracefully when upstream failed" in assertAllStagesStopped { val sourceProbe = TestPublisher.manualProbe[Int]() val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInitAsync(() => Future.successful(TestSink.probe[Int]))) @@ -127,12 +128,27 @@ class LazySinkSpec extends StreamSpec { } } - val result = Source(List("whatever")).runWith(Sink.lazyInitAsync[String, NotUsed](() => { - println("create sink"); Future.successful(Sink.fromGraph(FailingInnerMat)) + val result = Source(List("whatever")).runWith(Sink.lazyInitAsync[String, NotUsed](() ⇒ { + Future.successful(Sink.fromGraph(FailingInnerMat)) })) result.failed.futureValue should ===(matFail) } + + // reproducer for #25410 + "lazily propagate failure" in { + case object MyException extends Exception + val lazyMatVal = Source(List(1)) + .concat(Source.lazily(() ⇒ Source.failed(MyException))) + .runWith(Sink.lazyInitAsync(() ⇒ Future.successful(Sink.seq[Int]))) + + // lazy init async materialized a sink, so we should have a some here + val innerMatVal: Future[immutable.Seq[Int]] = lazyMatVal.futureValue.get + + // the actual matval from Sink.seq should be failed when the stream fails + innerMatVal.failed.futureValue should ===(MyException) + + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 1155c59aa4..70f2b1b6a9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -588,7 +588,11 @@ import akka.util.ccompat._ override def onUpstreamFailure(ex: Throwable): Unit = { // propagate exception irrespective if the cached element has been pushed or not subOutlet.fail(ex) - maybeCompleteStage() + // #25410 if we fail the stage here directly, the SubSource may not have been started yet, + // which can happen if upstream fails immediately after emitting a first value. + // The SubSource won't be started until the stream shuts down, which means downstream won't see the failure, + // scheduling it lets the interpreter first start the substream + getAsyncCallback[Throwable](failStage).invoke(ex) } })