=str #19469 More verbose logging of stage and tcp errors

* Logs all exceptions thrown in GraphStages at error level
* Logs any exception leading to an abort of a tcp connection at debug level
This commit is contained in:
Johan Andrén 2016-01-29 15:13:18 +01:00
parent a6aee310ba
commit 2ed7911d17
5 changed files with 24 additions and 20 deletions

View file

@ -467,7 +467,9 @@ private[stream] final class GraphInterpreter(
logic.beforePreStart()
logic.preStart()
} catch {
case NonFatal(e) logic.failStage(e)
case NonFatal(e)
log.error(e, "Error during preStart in [{}]", assembly.stages(logic.stageId))
logic.failStage(e)
}
afterStageHasRun(logic)
i += 1
@ -534,7 +536,12 @@ private[stream] final class GraphInterpreter(
catch {
case NonFatal(e)
if (activeStage == null) throw e
else activeStage.failStage(e, isInternal = true)
else {
val stage = assembly.stages(activeStage.stageId)
log.error(e, "Error in stage [{}]: {}", stage, e.getMessage)
activeStage.failStage(e)
}
}
afterStageHasRun(activeStage)
eventsRemaining -= 1
@ -669,7 +676,7 @@ private[stream] final class GraphInterpreter(
logic.afterPostStop()
} catch {
case NonFatal(e)
log.error(e, s"Error during postStop in [${assembly.stages(logic.stageId)}]")
log.error(e, s"Error during postStop in [{}]: {}", assembly.stages(logic.stageId), e.getMessage)
}
}
@ -698,7 +705,7 @@ private[stream] final class GraphInterpreter(
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
}
private[stream] def fail(connection: Int, ex: Throwable, isInternal: Boolean): Unit = {
private[stream] def fail(connection: Int, ex: Throwable): Unit = {
val currentState = portStates(connection)
if (Debug) println(s"$Name fail($connection, $ex) [$currentState]")
portStates(connection) = currentState | OutClosed
@ -706,8 +713,6 @@ private[stream] final class GraphInterpreter(
portStates(connection) = currentState | (OutClosed | InFailed)
connectionSlots(connection) = Failed(ex, connectionSlots(connection))
if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection)
} else if (isInternal) {
log.error(ex, "Error after stage was closed.")
}
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
}