=str #19067: Fixed error reporting after stage is closed.

This commit is contained in:
Endre Sándor Varga 2015-12-14 16:42:43 +01:00
parent 3d20915cf4
commit 6b4a4848c9
6 changed files with 50 additions and 13 deletions

View file

@ -8,6 +8,7 @@ import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.testkit.EventFilter
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -231,5 +232,29 @@ class ActorGraphInterpreterSpec extends AkkaSpec {
Await.result(f2, 3.seconds) should ===(1 to 10) Await.result(f2, 3.seconds) should ===(1 to 10)
} }
"be able to properly report errors if an error happens for an already completed stage" in {
val failyStage = new GraphStage[SourceShape[Int]] {
override val shape: SourceShape[Int] = new SourceShape(Outlet[Int]("test.out"))
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(shape.outlet, new OutHandler {
override def onPull(): Unit = {
completeStage()
// This cannot be propagated now since the stage is already closed
push(shape.outlet, -1)
}
})
}
}
EventFilter[IllegalArgumentException](message = "Error after stage was closed.", occurrences = 1).intercept {
Await.result(Source.fromGraph(failyStage).runWith(Sink.ignore), 3.seconds)
}
}
} }
} }

View file

@ -11,9 +11,10 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, _
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils.TE import akka.stream.testkit.Utils.TE
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
import akka.event.NoLogging
trait GraphInterpreterSpecKit { trait GraphInterpreterSpecKit extends AkkaSpec {
val logger = Logging(system, "InterpreterSpecKit")
abstract class Builder { abstract class Builder {
private var _interpreter: GraphInterpreter = _ private var _interpreter: GraphInterpreter = _
@ -72,7 +73,7 @@ trait GraphInterpreterSpecKit {
val (inHandlers, outHandlers, logics) = val (inHandlers, outHandlers, logics) =
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ()) assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ())
_interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics,
(_, _, _) (), fuzzingMode = false) (_, _, _) (), fuzzingMode = false)
for ((upstream, i) upstreams.zipWithIndex) { for ((upstream, i) upstreams.zipWithIndex) {
@ -90,7 +91,7 @@ trait GraphInterpreterSpecKit {
def manualInit(assembly: GraphAssembly): Unit = { def manualInit(assembly: GraphAssembly): Unit = {
val (inHandlers, outHandlers, logics) = val (inHandlers, outHandlers, logics) =
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ()) assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ())
_interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics,
(_, _, _) (), fuzzingMode = false) (_, _, _) (), fuzzingMode = false)
} }

View file

@ -558,7 +558,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
new InvalidAbsorbTermination)) { new InvalidAbsorbTermination)) {
lastEvents() should be(Set.empty) lastEvents() should be(Set.empty)
EventFilter.error("It is not allowed to call absorbTermination() from onDownstreamFinish.").intercept { EventFilter[UnsupportedOperationException]("It is not allowed to call absorbTermination() from onDownstreamFinish.", occurrences = 1).intercept {
downstream.cancel() downstream.cancel()
lastEvents() should be(Set(Cancel)) lastEvents() should be(Set(Cancel))
} }

View file

@ -157,7 +157,7 @@ private[stream] object ActorGraphInterpreter {
if (!(upstreamCompleted || downstreamCanceled) && (upstream ne null)) { if (!(upstreamCompleted || downstreamCanceled) && (upstream ne null)) {
upstream.cancel() upstream.cancel()
} }
onError(e) if (!isClosed(out)) onError(e)
} }
def onComplete(): Unit = def onComplete(): Unit =

View file

@ -521,7 +521,7 @@ private[stream] final class GraphInterpreter(
catch { catch {
case NonFatal(e) case NonFatal(e)
if (activeStage == null) throw e if (activeStage == null) throw e
else activeStage.failStage(e) else activeStage.failStage(e, isInternal = true)
} }
afterStageHasRun(activeStage) afterStageHasRun(activeStage)
eventsRemaining -= 1 eventsRemaining -= 1
@ -671,13 +671,16 @@ private[stream] final class GraphInterpreter(
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
} }
private[stream] def fail(connection: Int, ex: Throwable): Unit = { private[stream] def fail(connection: Int, ex: Throwable, isInternal: Boolean): Unit = {
val currentState = portStates(connection) val currentState = portStates(connection)
if (Debug) println(s"$Name fail($connection, $ex) [$currentState]") if (Debug) println(s"$Name fail($connection, $ex) [$currentState]")
portStates(connection) = currentState | (OutClosed | InFailed) portStates(connection) = currentState | OutClosed
if ((currentState & InClosed) == 0) { if ((currentState & (InClosed | OutClosed)) == 0) {
portStates(connection) = currentState | (OutClosed | InFailed)
connectionSlots(connection) = Failed(ex, connectionSlots(connection)) connectionSlots(connection) = Failed(ex, connectionSlots(connection))
if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection) if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection)
} else if (isInternal) {
log.error(ex, "Error after stage was closed.")
} }
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
} }

View file

@ -485,7 +485,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/** /**
* Signals failure through the given port. * Signals failure through the given port.
*/ */
final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex) final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex, isInternal = false)
/** /**
* Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, * Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called,
@ -509,13 +509,21 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Automatically invokes [[cancel()]] or [[fail()]] on all the input or output ports that have been called, * Automatically invokes [[cancel()]] or [[fail()]] on all the input or output ports that have been called,
* then stops the stage, then [[postStop()]] is called. * then stops the stage, then [[postStop()]] is called.
*/ */
final def failStage(ex: Throwable): Unit = { final def failStage(ex: Throwable): Unit = failStage(ex, isInternal = false)
/**
* INTERNAL API
*
* Used to signal errors caught by the interpreter itself. This method logs failures if the stage has been
* already closed if ``isInternal`` is set to true.
*/
private[stream] final def failStage(ex: Throwable, isInternal: Boolean): Unit = {
var i = 0 var i = 0
while (i < portToConn.length) { while (i < portToConn.length) {
if (i < inCount) if (i < inCount)
interpreter.cancel(portToConn(i)) interpreter.cancel(portToConn(i))
else else
interpreter.fail(portToConn(i), ex) interpreter.fail(portToConn(i), ex, isInternal)
i += 1 i += 1
} }
if (keepGoingAfterAllPortsClosed) interpreter.closeKeptAliveStageIfNeeded(stageId) if (keepGoingAfterAllPortsClosed) interpreter.closeKeptAliveStageIfNeeded(stageId)