Merge pull request #19424 from drewhk/wip-19338-fwd-port-drewhk

str #19338: NPE in ActorGraphInterpreter (fwd port)
This commit is contained in:
Roland Kuhn 2016-01-14 08:09:33 +01:00
commit e92429dab9
2 changed files with 79 additions and 3 deletions

View file

@ -3,11 +3,13 @@
*/
package akka.stream.impl.fusing
import java.util.concurrent.CountDownLatch
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils._
import akka.stream.testkit.{AkkaSpec, TestPublisher, TestSubscriber}
import akka.testkit.EventFilter
import scala.concurrent.Await
@ -256,5 +258,79 @@ class ActorGraphInterpreterSpec extends AkkaSpec {
}
"be able to properly handle case where a stage fails before subscription happens" in assertAllStagesStopped {
// Fuzzing needs to be off, so that the failure can propagate to the output boundary before the ExposedPublisher
// message.
val noFuzzMat = ActorMaterializer(ActorMaterializerSettings(system).withFuzzing(false))
val te = TE("Test failure in preStart")
val evilLatch = new CountDownLatch(1)
/*
* This is a somewhat tricky test setup. We need the following conditions to be met:
* - the stage should fail its output port before the ExposedPublisher message is processed
* - the enclosing actor (and therefore the stage) should be kept alive until a stray SubscribePending arrives
* that has been enqueued after ExposedPublisher message has been enqueued, but before it has been processed
*
* To achieve keeping alive the stage for long enough, we use an extra input and output port and instead
* of failing the stage, we fail only the output port under test.
*
* To delay the startup long enough, so both ExposedPublisher and SubscribePending are enqueued, we use an evil
* latch to delay the preStart() (which in turn delays the enclosing actor's preStart).
*
*/
val failyStage = new GraphStage[FanOutShape2[Int, Int, Int]] {
override val shape: FanOutShape2[Int, Int, Int] = new FanOutShape2(
Inlet[Int]("test.in"),
Outlet[Int]("test.out0"),
Outlet[Int]("test.out1"))
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def preStart(): Unit = {
pull(shape.in)
evilLatch.await()
fail(shape.out0, te)
}
setHandler(shape.out0, ignoreTerminateOutput) //We fail in preStart anyway
setHandler(shape.out1, ignoreTerminateOutput) //We fail in preStart anyway
passAlong(shape.in, shape.out1)
}
}
val downstream0 = TestSubscriber.probe[Int]()
val downstream1 = TestSubscriber.probe[Int]()
val upstream = TestPublisher.probe[Int]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val faily = b.add(failyStage)
Source.fromPublisher(upstream) ~> faily.in
faily.out0 ~> Sink.fromSubscriber(downstream0)
faily.out1 ~> Sink.fromSubscriber(downstream1)
ClosedShape
}).run()(noFuzzMat)
evilLatch.countDown()
downstream0.expectSubscriptionAndError(te)
// If an NPE would happen due to unset exposedPublisher (see #19338) this would receive a failure instead
// of the actual element
downstream1.request(1)
upstream.sendNext(42)
downstream1.expectNext(42)
upstream.sendComplete()
downstream1.expectComplete()
}
}
}

View file

@ -269,12 +269,12 @@ private[stream] object ActorGraphInterpreter {
}
def exposedPublisher(publisher: ActorPublisher[Any]): Unit = {
exposedPublisher = publisher
upstreamFailed match {
case _: Some[_]
publisher.shutdown(upstreamFailed)
case _
if (upstreamCompleted) publisher.shutdown(None)
else exposedPublisher = publisher
}
}