From 6b4a4848c941649a97b949ee61aaae9d47a267ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 14 Dec 2015 16:42:43 +0100 Subject: [PATCH] =str #19067: Fixed error reporting after stage is closed. --- .../fusing/ActorGraphInterpreterSpec.scala | 25 +++++++++++++++++++ .../impl/fusing/GraphInterpreterSpecKit.scala | 9 ++++--- .../stream/impl/fusing/InterpreterSpec.scala | 2 +- .../impl/fusing/ActorGraphInterpreter.scala | 2 +- .../stream/impl/fusing/GraphInterpreter.scala | 11 +++++--- .../scala/akka/stream/stage/GraphStage.scala | 14 ++++++++--- 6 files changed, 50 insertions(+), 13 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index 8d3bb153b9..9a005bef44 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -8,6 +8,7 @@ import akka.stream.scaladsl._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.testkit.AkkaSpec import akka.stream.testkit.Utils._ +import akka.testkit.EventFilter import scala.concurrent.Await import scala.concurrent.duration._ @@ -231,5 +232,29 @@ class ActorGraphInterpreterSpec extends AkkaSpec { Await.result(f2, 3.seconds) should ===(1 to 10) } + "be able to properly report errors if an error happens for an already completed stage" in { + + val failyStage = new GraphStage[SourceShape[Int]] { + override val shape: SourceShape[Int] = new SourceShape(Outlet[Int]("test.out")) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + + setHandler(shape.outlet, new OutHandler { + override def onPull(): Unit = { + completeStage() + // This cannot be propagated now since the stage is already closed + push(shape.outlet, -1) + } + }) + + } + } + + EventFilter[IllegalArgumentException](message = "Error after stage was closed.", occurrences = 1).intercept { + Await.result(Source.fromGraph(failyStage).runWith(Sink.ignore), 3.seconds) + } + + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index 478d515ee4..245e0a01f1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -11,9 +11,10 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, _ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.Utils.TE import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly -import akka.event.NoLogging -trait GraphInterpreterSpecKit { +trait GraphInterpreterSpecKit extends AkkaSpec { + + val logger = Logging(system, "InterpreterSpecKit") abstract class Builder { private var _interpreter: GraphInterpreter = _ @@ -72,7 +73,7 @@ trait GraphInterpreterSpecKit { val (inHandlers, outHandlers, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, + _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, (_, _, _) ⇒ (), fuzzingMode = false) for ((upstream, i) ← upstreams.zipWithIndex) { @@ -90,7 +91,7 @@ trait GraphInterpreterSpecKit { def manualInit(assembly: GraphAssembly): Unit = { val (inHandlers, outHandlers, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, + _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, (_, _, _) ⇒ (), fuzzingMode = false) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 00c9ff1ba1..fe1c3cd34b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -558,7 +558,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { new InvalidAbsorbTermination)) { lastEvents() should be(Set.empty) - EventFilter.error("It is not allowed to call absorbTermination() from onDownstreamFinish.").intercept { + EventFilter[UnsupportedOperationException]("It is not allowed to call absorbTermination() from onDownstreamFinish.", occurrences = 1).intercept { downstream.cancel() lastEvents() should be(Set(Cancel)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index fb6fc6e236..dc2c7900ed 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -157,7 +157,7 @@ private[stream] object ActorGraphInterpreter { if (!(upstreamCompleted || downstreamCanceled) && (upstream ne null)) { upstream.cancel() } - onError(e) + if (!isClosed(out)) onError(e) } def onComplete(): Unit = diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 16f9abe539..09db604597 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -521,7 +521,7 @@ private[stream] final class GraphInterpreter( catch { case NonFatal(e) ⇒ if (activeStage == null) throw e - else activeStage.failStage(e) + else activeStage.failStage(e, isInternal = true) } afterStageHasRun(activeStage) eventsRemaining -= 1 @@ -671,13 +671,16 @@ private[stream] final class GraphInterpreter( if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) } - private[stream] def fail(connection: Int, ex: Throwable): Unit = { + private[stream] def fail(connection: Int, ex: Throwable, isInternal: Boolean): Unit = { val currentState = portStates(connection) if (Debug) println(s"$Name fail($connection, $ex) [$currentState]") - portStates(connection) = currentState | (OutClosed | InFailed) - if ((currentState & InClosed) == 0) { + portStates(connection) = currentState | OutClosed + if ((currentState & (InClosed | OutClosed)) == 0) { + portStates(connection) = currentState | (OutClosed | InFailed) connectionSlots(connection) = Failed(ex, connectionSlots(connection)) if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection) + } else if (isInternal) { + log.error(ex, "Error after stage was closed.") } if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index a6f4c5854b..2a57396f85 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -485,7 +485,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Signals failure through the given port. */ - final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex) + final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex, isInternal = false) /** * Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, @@ -509,13 +509,21 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Automatically invokes [[cancel()]] or [[fail()]] on all the input or output ports that have been called, * then stops the stage, then [[postStop()]] is called. */ - final def failStage(ex: Throwable): Unit = { + final def failStage(ex: Throwable): Unit = failStage(ex, isInternal = false) + + /** + * INTERNAL API + * + * Used to signal errors caught by the interpreter itself. This method logs failures if the stage has been + * already closed if ``isInternal`` is set to true. + */ + private[stream] final def failStage(ex: Throwable, isInternal: Boolean): Unit = { var i = 0 while (i < portToConn.length) { if (i < inCount) interpreter.cancel(portToConn(i)) else - interpreter.fail(portToConn(i), ex) + interpreter.fail(portToConn(i), ex, isInternal) i += 1 } if (keepGoingAfterAllPortsClosed) interpreter.closeKeptAliveStageIfNeeded(stageId)