From 42fdf15d22e8af89c7731a1d6c8344f4e515aa1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 9 Mar 2017 17:04:46 +0100 Subject: [PATCH] Restore interpreter debug output #22433 --- .../impl/PhasedFusingActorMaterializer.scala | 5 +- .../impl/fusing/ActorGraphInterpreter.scala | 26 +++++- .../stream/impl/fusing/GraphInterpreter.scala | 84 +++++++++---------- .../scala/akka/stream/stage/GraphStage.scala | 8 ++ 4 files changed, 77 insertions(+), 46 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index ff74cdbbc8..631eff49c9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -19,6 +19,7 @@ import akka.stream.impl.fusing.GraphInterpreter.Connection import akka.stream.impl.fusing._ import akka.stream.impl.io.{ TLSActor, TlsModule } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } +import akka.util.OptionVal import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.collection.immutable.Map @@ -555,8 +556,10 @@ final class GraphStageIsland( override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (GraphStageLogic, Any) = { // TODO: bail on unknown types val stageModule = mod.asInstanceOf[GraphStageModule[Shape, Any]] - val matAndLogic = stageModule.stage.createLogicAndMaterializedValue(attributes) + val stage = stageModule.stage + val matAndLogic = stage.createLogicAndMaterializedValue(attributes) val logic = matAndLogic._1 + logic.originalStage = OptionVal.Some(stage) logic.attributes = attributes logics.add(logic) logic.stageId = logics.size() - 1 diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index cb89026539..7314a65b5b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -14,6 +14,7 @@ import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } +import akka.util.OptionVal import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.tailrec @@ -594,7 +595,6 @@ final class GraphInterpreterShell( * - a new error is encountered */ def tryAbort(ex: Throwable): Unit = { - ex.printStackTrace() val reason = ex match { case s: SpecViolation ⇒ new IllegalStateException("Shutting down because of violation of the Reactive Streams specification.", s) @@ -620,8 +620,28 @@ final class GraphInterpreterShell( } } - // TODO: Fix debug string - override def toString: String = s"GraphInterpreterShell" // \n${assembly.toString.replace("\n", "\n ")}" + override def toString: String = { + val builder = StringBuilder.newBuilder + builder.append("GraphInterpreterShell(\n logics: [\n") + interpreter.logics.foreach { logic ⇒ + builder.append(" ") + .append(logic.originalStage.getOrElse(logic).toString) + .append(" attrs: [") + .append(logic.attributes.attributeList.mkString(", ")) + .append("],\n") + } + builder.setLength(builder.length - 2) + builder.append("\n ],\n connections: [\n") + interpreter.connections.foreach { connection ⇒ + builder + .append(" ") + .append(connection.toString) + .append(",\n") + } + builder.setLength(builder.length - 2) + builder.append("\n ]\n)") + builder.toString() + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 16eb18017a..975170df6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -8,6 +8,7 @@ import akka.event.LoggingAdapter import akka.stream.stage._ import akka.stream._ import java.util.concurrent.ThreadLocalRandom + import scala.util.control.NonFatal /** @@ -296,8 +297,7 @@ final class GraphInterpreter( logic.preStart() } catch { case NonFatal(e) ⇒ - e.printStackTrace() - // log.error(e, "Error during preStart in [{}]", assembly.stages(logic.stageId)) + log.error(e, "Error during preStart in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage) logic.failStage(e) } afterStageHasRun(logic) @@ -351,10 +351,7 @@ final class GraphInterpreter( def reportStageError(e: Throwable): Unit = { if (activeStage == null) throw e else { - // TODO: Get error reporting back - //val stage = stages(activeStage.stageId) - - log.error(e, "Error in stage [{}]: {}", activeStage, e.getMessage) + log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage) activeStage.failStage(e) // Abort chasing @@ -562,7 +559,7 @@ final class GraphInterpreter( logic.afterPostStop() } catch { case NonFatal(e) ⇒ - //log.error(e, s"Error during postStop in [{}]: {}", assembly.stages(logic.stageId), e.getMessage) + log.error(e, s"Error during postStop in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage) } } @@ -631,45 +628,48 @@ final class GraphInterpreter( /** * Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks. + * Use dot/graphviz to render graph. * * Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very * simplistic tool, make sure you are understanding what you are doing and then it will serve you well. */ def dumpWaits(): Unit = println(toString) - // override def toString: String = { - // val builder = new StringBuilder("digraph waits {\n") - // - // for (i ← assembly.stages.indices) - // builder.append(s"""N$i [label="${assembly.stages(i)}"]""" + "\n") - // - // def nameIn(port: Int): String = { - // val owner = assembly.inOwners(port) - // if (owner == Boundary) "Out" + port - // else "N" + owner - // } - // - // def nameOut(port: Int): String = { - // val owner = assembly.outOwners(port) - // if (owner == Boundary) "In" + port - // else "N" + owner - // } - // - // for (i ← connections.indices) { - // connections(i).portState match { - // case InReady ⇒ - // builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [label=shouldPull; color=blue]""") - // case OutReady ⇒ - // builder.append(s""" ${nameOut(i)} -> ${nameIn(i)} [label=shouldPush; color=red];""") - // case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) ⇒ - // builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [style=dotted; label=closed dir=both];""") - // case _ ⇒ - // } - // builder.append("\n") - // } - // - // builder.append("}\n") - // builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") - // builder.toString() - // } + override def toString: String = { + try { + val builder = new StringBuilder("\ndot format graph for deadlock analysis:\n") + builder.append("================================================================\n") + builder.append("digraph waits {\n") + + for (i ← logics.indices) { + val logic = logics(i) + val label = logic.originalStage.getOrElse(logic).toString + builder.append(s""" N$i [label="$label"];""").append('\n') + } + + val logicIndexes = logics.zipWithIndex.map { case (stage, idx) ⇒ stage → idx }.toMap + for (connection ← connections) { + val inName = "N" + logicIndexes(connection.inOwner) + val outName = "N" + logicIndexes(connection.outOwner) + + builder.append(s" $inName -> $outName ") + connection.portState match { + case InReady ⇒ + builder.append("[label=shouldPull; color=blue];") + case OutReady ⇒ + builder.append(s"[label=shouldPush; color=red];") + case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) ⇒ + builder.append("[style=dotted; label=closed dir=both];") + case _ ⇒ + } + builder.append("\n") + } + + builder.append("}\n================================================================\n") + builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") + builder.toString() + } catch { + case _: NoSuchElementException ⇒ "Not all logics has a stage listed, cannot create graph" + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 5586cb75c7..68a101b1f8 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -21,6 +21,7 @@ import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration import akka.stream.actor.ActorSubscriberMessage import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes } +import akka.util.OptionVal abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] { @@ -228,6 +229,13 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ private[stream] var attributes: Attributes = Attributes.none + /** + * INTERNAL API + * + * If possible a link back to the stage that the logic was created with, used for debugging. + */ + private[stream] var originalStage: OptionVal[GraphStageWithMaterializedValue[_ <: Shape, _]] = OptionVal.None + /** * INTERNAL API */