Restore interpreter debug output #22433

This commit is contained in:
Johan Andrén 2017-03-09 17:04:46 +01:00 committed by GitHub
parent c8097e4876
commit 42fdf15d22
4 changed files with 77 additions and 46 deletions

View file

@ -19,6 +19,7 @@ import akka.stream.impl.fusing.GraphInterpreter.Connection
import akka.stream.impl.fusing._ import akka.stream.impl.fusing._
import akka.stream.impl.io.{ TLSActor, TlsModule } import akka.stream.impl.io.{ TLSActor, TlsModule }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import akka.util.OptionVal
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.collection.immutable.Map import scala.collection.immutable.Map
@ -555,8 +556,10 @@ final class GraphStageIsland(
override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (GraphStageLogic, Any) = { override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (GraphStageLogic, Any) = {
// TODO: bail on unknown types // TODO: bail on unknown types
val stageModule = mod.asInstanceOf[GraphStageModule[Shape, Any]] val stageModule = mod.asInstanceOf[GraphStageModule[Shape, Any]]
val matAndLogic = stageModule.stage.createLogicAndMaterializedValue(attributes) val stage = stageModule.stage
val matAndLogic = stage.createLogicAndMaterializedValue(attributes)
val logic = matAndLogic._1 val logic = matAndLogic._1
logic.originalStage = OptionVal.Some(stage)
logic.attributes = attributes logic.attributes = attributes
logics.add(logic) logics.add(logic)
logic.stageId = logics.size() - 1 logic.stageId = logics.size() - 1

View file

@ -14,6 +14,7 @@ import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ } import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import akka.util.OptionVal
import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.annotation.tailrec import scala.annotation.tailrec
@ -594,7 +595,6 @@ final class GraphInterpreterShell(
* - a new error is encountered * - a new error is encountered
*/ */
def tryAbort(ex: Throwable): Unit = { def tryAbort(ex: Throwable): Unit = {
ex.printStackTrace()
val reason = ex match { val reason = ex match {
case s: SpecViolation case s: SpecViolation
new IllegalStateException("Shutting down because of violation of the Reactive Streams specification.", s) new IllegalStateException("Shutting down because of violation of the Reactive Streams specification.", s)
@ -620,8 +620,28 @@ final class GraphInterpreterShell(
} }
} }
// TODO: Fix debug string override def toString: String = {
override def toString: String = s"GraphInterpreterShell" // \n${assembly.toString.replace("\n", "\n ")}" val builder = StringBuilder.newBuilder
builder.append("GraphInterpreterShell(\n logics: [\n")
interpreter.logics.foreach { logic
builder.append(" ")
.append(logic.originalStage.getOrElse(logic).toString)
.append(" attrs: [")
.append(logic.attributes.attributeList.mkString(", "))
.append("],\n")
}
builder.setLength(builder.length - 2)
builder.append("\n ],\n connections: [\n")
interpreter.connections.foreach { connection
builder
.append(" ")
.append(connection.toString)
.append(",\n")
}
builder.setLength(builder.length - 2)
builder.append("\n ]\n)")
builder.toString()
}
} }
/** /**

View file

@ -8,6 +8,7 @@ import akka.event.LoggingAdapter
import akka.stream.stage._ import akka.stream.stage._
import akka.stream._ import akka.stream._
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**
@ -296,8 +297,7 @@ final class GraphInterpreter(
logic.preStart() logic.preStart()
} catch { } catch {
case NonFatal(e) case NonFatal(e)
e.printStackTrace() log.error(e, "Error during preStart in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage)
// log.error(e, "Error during preStart in [{}]", assembly.stages(logic.stageId))
logic.failStage(e) logic.failStage(e)
} }
afterStageHasRun(logic) afterStageHasRun(logic)
@ -351,10 +351,7 @@ final class GraphInterpreter(
def reportStageError(e: Throwable): Unit = { def reportStageError(e: Throwable): Unit = {
if (activeStage == null) throw e if (activeStage == null) throw e
else { else {
// TODO: Get error reporting back log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage)
//val stage = stages(activeStage.stageId)
log.error(e, "Error in stage [{}]: {}", activeStage, e.getMessage)
activeStage.failStage(e) activeStage.failStage(e)
// Abort chasing // Abort chasing
@ -562,7 +559,7 @@ final class GraphInterpreter(
logic.afterPostStop() logic.afterPostStop()
} catch { } catch {
case NonFatal(e) case NonFatal(e)
//log.error(e, s"Error during postStop in [{}]: {}", assembly.stages(logic.stageId), e.getMessage) log.error(e, s"Error during postStop in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage)
} }
} }
@ -631,45 +628,48 @@ final class GraphInterpreter(
/** /**
* Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks. * Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks.
* Use dot/graphviz to render graph.
* *
* Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very * Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very
* simplistic tool, make sure you are understanding what you are doing and then it will serve you well. * simplistic tool, make sure you are understanding what you are doing and then it will serve you well.
*/ */
def dumpWaits(): Unit = println(toString) def dumpWaits(): Unit = println(toString)
// override def toString: String = { override def toString: String = {
// val builder = new StringBuilder("digraph waits {\n") try {
// val builder = new StringBuilder("\ndot format graph for deadlock analysis:\n")
// for (i assembly.stages.indices) builder.append("================================================================\n")
// builder.append(s"""N$i [label="${assembly.stages(i)}"]""" + "\n") builder.append("digraph waits {\n")
//
// def nameIn(port: Int): String = { for (i logics.indices) {
// val owner = assembly.inOwners(port) val logic = logics(i)
// if (owner == Boundary) "Out" + port val label = logic.originalStage.getOrElse(logic).toString
// else "N" + owner builder.append(s""" N$i [label="$label"];""").append('\n')
// } }
//
// def nameOut(port: Int): String = { val logicIndexes = logics.zipWithIndex.map { case (stage, idx) stage idx }.toMap
// val owner = assembly.outOwners(port) for (connection connections) {
// if (owner == Boundary) "In" + port val inName = "N" + logicIndexes(connection.inOwner)
// else "N" + owner val outName = "N" + logicIndexes(connection.outOwner)
// }
// builder.append(s" $inName -> $outName ")
// for (i connections.indices) { connection.portState match {
// connections(i).portState match { case InReady
// case InReady builder.append("[label=shouldPull; color=blue];")
// builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [label=shouldPull; color=blue]""") case OutReady
// case OutReady builder.append(s"[label=shouldPush; color=red];")
// builder.append(s""" ${nameOut(i)} -> ${nameIn(i)} [label=shouldPush; color=red];""") case x if (x | InClosed | OutClosed) == (InClosed | OutClosed)
// case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) builder.append("[style=dotted; label=closed dir=both];")
// builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [style=dotted; label=closed dir=both];""") case _
// case _ }
// } builder.append("\n")
// builder.append("\n") }
// }
// builder.append("}\n================================================================\n")
// builder.append("}\n") builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})")
// builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") builder.toString()
// builder.toString() } catch {
// } case _: NoSuchElementException "Not all logics has a stage listed, cannot create graph"
}
}
} }

View file

@ -21,6 +21,7 @@ import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.stream.actor.ActorSubscriberMessage import akka.stream.actor.ActorSubscriberMessage
import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes } import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes }
import akka.util.OptionVal
abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] { abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {
@ -228,6 +229,13 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/ */
private[stream] var attributes: Attributes = Attributes.none private[stream] var attributes: Attributes = Attributes.none
/**
* INTERNAL API
*
* If possible a link back to the stage that the logic was created with, used for debugging.
*/
private[stream] var originalStage: OptionVal[GraphStageWithMaterializedValue[_ <: Shape, _]] = OptionVal.None
/** /**
* INTERNAL API * INTERNAL API
*/ */