Merge pull request #19647 from akka/wip-19469-better-stream-error-logging-johanandren

=str #19469 Log all graph stage exceptions
This commit is contained in:
drewhk 2016-02-01 17:18:12 +01:00
commit c6eb88e334
5 changed files with 24 additions and 20 deletions

View file

@ -5,6 +5,7 @@ akka {
serialize-messages = on
default-dispatcher.throughput = 1 // Amplify the effects of fuzzing
}
akka.actor.warn-about-java-serializer-usage = false
stream.materializer.debug.fuzzing-mode = on
}

View file

@ -252,7 +252,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec {
}
}
EventFilter[IllegalArgumentException](message = "Error after stage was closed.", occurrences = 1).intercept {
EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept {
Await.result(Source.fromGraph(failyStage).runWith(Sink.ignore), 3.seconds)
}

View file

@ -467,7 +467,9 @@ private[stream] final class GraphInterpreter(
logic.beforePreStart()
logic.preStart()
} catch {
case NonFatal(e) logic.failStage(e)
case NonFatal(e)
log.error(e, "Error during preStart in [{}]", assembly.stages(logic.stageId))
logic.failStage(e)
}
afterStageHasRun(logic)
i += 1
@ -534,7 +536,12 @@ private[stream] final class GraphInterpreter(
catch {
case NonFatal(e)
if (activeStage == null) throw e
else activeStage.failStage(e, isInternal = true)
else {
val stage = assembly.stages(activeStage.stageId)
log.error(e, "Error in stage [{}]: {}", stage, e.getMessage)
activeStage.failStage(e)
}
}
afterStageHasRun(activeStage)
eventsRemaining -= 1
@ -669,7 +676,7 @@ private[stream] final class GraphInterpreter(
logic.afterPostStop()
} catch {
case NonFatal(e)
log.error(e, s"Error during postStop in [${assembly.stages(logic.stageId)}]")
log.error(e, s"Error during postStop in [{}]: {}", assembly.stages(logic.stageId), e.getMessage)
}
}
@ -698,7 +705,7 @@ private[stream] final class GraphInterpreter(
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
}
private[stream] def fail(connection: Int, ex: Throwable, isInternal: Boolean): Unit = {
private[stream] def fail(connection: Int, ex: Throwable): Unit = {
val currentState = portStates(connection)
if (Debug) println(s"$Name fail($connection, $ex) [$currentState]")
portStates(connection) = currentState | OutClosed
@ -706,8 +713,6 @@ private[stream] final class GraphInterpreter(
portStates(connection) = currentState | (OutClosed | InFailed)
connectionSlots(connection) = Failed(ex, connectionSlots(connection))
if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection)
} else if (isInternal) {
log.error(ex, "Error after stage was closed.")
}
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
}

View file

@ -266,8 +266,14 @@ private[stream] object TcpConnectionStage {
}
override def onUpstreamFailure(ex: Throwable): Unit = {
if (connection != null) connection ! Abort
else failStage(ex)
if (connection != null) {
if (interpreter.log.isDebugEnabled) {
interpreter.log.debug("Aborting tcp connection because of upstream failure: {}\n{}",
ex.getMessage,
ex.getStackTrace.mkString("\n"))
}
connection ! Abort
} else failStage(ex)
}
})

View file

@ -463,7 +463,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
/**
* Signals failure through the given port.
*/
final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex, isInternal = false)
final protected def fail[T](out: Outlet[T], ex: Throwable): Unit = interpreter.fail(conn(out), ex)
/**
* Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called,
@ -487,21 +487,13 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* Automatically invokes [[cancel()]] or [[fail()]] on all the input or output ports that have been called,
* then stops the stage, then [[postStop()]] is called.
*/
final def failStage(ex: Throwable): Unit = failStage(ex, isInternal = false)
/**
* INTERNAL API
*
* Used to signal errors caught by the interpreter itself. This method logs failures if the stage has been
* already closed if ``isInternal`` is set to true.
*/
private[stream] final def failStage(ex: Throwable, isInternal: Boolean): Unit = {
final def failStage(ex: Throwable): Unit = {
var i = 0
while (i < portToConn.length) {
if (i < inCount)
interpreter.cancel(portToConn(i))
else
interpreter.fail(portToConn(i), ex, isInternal)
interpreter.fail(portToConn(i), ex)
i += 1
}
setKeepGoing(false)