From fc6f2a1bfbb32bcd87de41f8d400bf46f7fab77b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 12 Jan 2016 15:52:03 +0100 Subject: [PATCH] str #19338: NPE in ActorGraphInterpreter due to race with publisher shutdown and subscription (cherry picked from commit 161e01e) --- .../fusing/ActorGraphInterpreterSpec.scala | 80 ++++++++++++++++++- .../impl/fusing/ActorGraphInterpreter.scala | 2 +- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index 8c806e17ab..9c656f0006 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -3,11 +3,13 @@ */ package akka.stream.impl.fusing +import java.util.concurrent.CountDownLatch + import akka.stream._ import akka.stream.scaladsl._ -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import akka.stream.testkit.AkkaSpec +import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} import akka.stream.testkit.Utils._ +import akka.stream.testkit.{AkkaSpec, TestPublisher, TestSubscriber} import akka.testkit.EventFilter import scala.concurrent.Await @@ -256,5 +258,79 @@ class ActorGraphInterpreterSpec extends AkkaSpec { } + "be able to properly handle case where a stage fails before subscription happens" in assertAllStagesStopped { + + // Fuzzing needs to be off, so that the failure can propagate to the output boundary before the ExposedPublisher + // message. + val noFuzzMat = ActorMaterializer(ActorMaterializerSettings(system).withFuzzing(false)) + + val te = TE("Test failure in preStart") + + val evilLatch = new CountDownLatch(1) + + /* + * This is a somewhat tricky test setup. We need the following conditions to be met: + * - the stage should fail its output port before the ExposedPublisher message is processed + * - the enclosing actor (and therefore the stage) should be kept alive until a stray SubscribePending arrives + * that has been enqueued after ExposedPublisher message has been enqueued, but before it has been processed + * + * To achieve keeping alive the stage for long enough, we use an extra input and output port and instead + * of failing the stage, we fail only the output port under test. + * + * To delay the startup long enough, so both ExposedPublisher and SubscribePending are enqueued, we use an evil + * latch to delay the preStart() (which in turn delays the enclosing actor's preStart). + * + */ + + val failyStage = new GraphStage[FanOutShape2[Int, Int, Int]] { + override val shape: FanOutShape2[Int, Int, Int] = new FanOutShape2( + Inlet[Int]("test.in"), + Outlet[Int]("test.out0"), + Outlet[Int]("test.out1")) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + + override def preStart(): Unit = { + pull(shape.in) + evilLatch.await() + fail(shape.out0, te) + } + + setHandler(shape.out0, ignoreTerminateOutput) //We fail in preStart anyway + setHandler(shape.out1, ignoreTerminateOutput) //We fail in preStart anyway + passAlong(shape.in, shape.out1) + } + } + + val downstream0 = TestSubscriber.probe[Int]() + val downstream1 = TestSubscriber.probe[Int]() + + val upstream = TestPublisher.probe[Int]() + + RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒ + import GraphDSL.Implicits._ + val faily = b.add(failyStage) + + Source.fromPublisher(upstream) ~> faily.in + faily.out0 ~> Sink.fromSubscriber(downstream0) + faily.out1 ~> Sink.fromSubscriber(downstream1) + + ClosedShape + }).run()(noFuzzMat) + + evilLatch.countDown() + downstream0.expectSubscriptionAndError(te) + + // If an NPE would happen due to unset exposedPublisher (see #19338) this would receive a failure instead + // of the actual element + downstream1.request(1) + upstream.sendNext(42) + downstream1.expectNext(42) + + upstream.sendComplete() + downstream1.expectComplete() + + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 5bb63b7a3e..d4e41499a6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -269,12 +269,12 @@ private[stream] object ActorGraphInterpreter { } def exposedPublisher(publisher: ActorPublisher[Any]): Unit = { + exposedPublisher = publisher upstreamFailed match { case _: Some[_] ⇒ publisher.shutdown(upstreamFailed) case _ ⇒ if (upstreamCompleted) publisher.shutdown(None) - else exposedPublisher = publisher } }