Race condition in lazy sink on immediate failure (#25413)

* Race condition in lazy sink on immediate failure #25410
This commit is contained in:
Johan Andrén 2019-03-18 14:28:31 +01:00 committed by Patrik Nordwall
parent d358a0c3b5
commit 17c80b3be9
2 changed files with 26 additions and 6 deletions

View file

@ -9,14 +9,15 @@ import java.util.concurrent.TimeoutException
import akka.NotUsed
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic }
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import scala.concurrent.{ Await, Future, Promise }
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }
class LazySinkSpec extends StreamSpec {
@ -79,7 +80,7 @@ class LazySinkSpec extends StreamSpec {
a[RuntimeException] shouldBe thrownBy { Await.result(futureProbe, remainingOrDefault) }
}
"failed gracefully when upstream failed" in assertAllStagesStopped {
"fail gracefully when upstream failed" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val futureProbe =
Source.fromPublisher(sourceProbe).runWith(Sink.lazyInitAsync(() => Future.successful(TestSink.probe[Int])))
@ -127,12 +128,27 @@ class LazySinkSpec extends StreamSpec {
}
}
val result = Source(List("whatever")).runWith(Sink.lazyInitAsync[String, NotUsed](() => {
println("create sink"); Future.successful(Sink.fromGraph(FailingInnerMat))
val result = Source(List("whatever")).runWith(Sink.lazyInitAsync[String, NotUsed](() {
Future.successful(Sink.fromGraph(FailingInnerMat))
}))
result.failed.futureValue should ===(matFail)
}
// reproducer for #25410
"lazily propagate failure" in {
case object MyException extends Exception
val lazyMatVal = Source(List(1))
.concat(Source.lazily(() Source.failed(MyException)))
.runWith(Sink.lazyInitAsync(() Future.successful(Sink.seq[Int])))
// lazy init async materialized a sink, so we should have a some here
val innerMatVal: Future[immutable.Seq[Int]] = lazyMatVal.futureValue.get
// the actual matval from Sink.seq should be failed when the stream fails
innerMatVal.failed.futureValue should ===(MyException)
}
}
}

View file

@ -588,7 +588,11 @@ import akka.util.ccompat._
override def onUpstreamFailure(ex: Throwable): Unit = {
// propagate exception irrespective if the cached element has been pushed or not
subOutlet.fail(ex)
maybeCompleteStage()
// #25410 if we fail the stage here directly, the SubSource may not have been started yet,
// which can happen if upstream fails immediately after emitting a first value.
// The SubSource won't be started until the stream shuts down, which means downstream won't see the failure,
// scheduling it lets the interpreter first start the substream
getAsyncCallback[Throwable](failStage).invoke(ex)
}
})