From df089016faa6e7ba6192a2d456a5d56d0c33d94c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 11 Feb 2019 13:35:38 +0100 Subject: [PATCH] Extract the layouts of the running streams as an AST (#25831) --- .../akka/stream/FlatMapConcatBenchmark.scala | 31 +-- .../scala/akka/stream/MapAsyncBenchmark.scala | 29 +-- .../akka/stream/PartitionHubBenchmark.scala | 25 +-- .../testkit/scaladsl/StreamTestKit.scala | 114 +++++++++- .../akka/stream/testkit/StreamSpec.scala | 19 +- .../snapshot/MaterializerStateSpec.scala | 34 +++ .../mima-filters/2.5.20.backwards.excludes | 5 + .../stream/impl/ActorMaterializerImpl.scala | 2 - .../impl/fusing/ActorGraphInterpreter.scala | 69 ++---- .../stream/impl/fusing/GraphInterpreter.scala | 71 +++---- .../stream/snapshot/MaterializerState.scala | 201 ++++++++++++++++++ 11 files changed, 433 insertions(+), 167 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala index 30423999cc..bae6344828 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlatMapConcatBenchmark.scala @@ -4,23 +4,19 @@ package akka.stream -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import scala.concurrent.Await -import scala.concurrent.duration._ +import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.NotUsed import akka.actor.ActorSystem -import akka.remote.artery.BenchTestSource -import akka.remote.artery.LatchSink -import akka.stream.impl.PhasedFusingActorMaterializer -import akka.stream.impl.StreamSupervisor +import akka.remote.artery.{ BenchTestSource, LatchSink } +import akka.stream.impl.fusing.GraphStages import akka.stream.scaladsl._ -import akka.testkit.TestProbe +import akka.stream.testkit.scaladsl.StreamTestKit import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ -import akka.stream.impl.fusing.GraphStages + +import scala.concurrent.Await +import scala.concurrent.duration._ object FlatMapConcatBenchmark { final val OperationsPerInvocation = 100000 @@ -112,19 +108,10 @@ class FlatMapConcatBenchmark { private def awaitLatch(latch: CountDownLatch): Unit = { if (!latch.await(30, TimeUnit.SECONDS)) { - dumpMaterializer() + implicit val ec = materializer.system.dispatcher + StreamTestKit.printDebugDump(materializer.supervisor) throw new RuntimeException("Latch didn't complete in time") } } - private def dumpMaterializer(): Unit = { - materializer match { - case impl: PhasedFusingActorMaterializer ⇒ - val probe = TestProbe()(system) - impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) - val children = probe.expectMsgType[StreamSupervisor.Children].children - children.foreach(_ ! StreamSupervisor.PrintDebugDump) - } - } - } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MapAsyncBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MapAsyncBenchmark.scala index 7a80b1eb15..6dfd6aa041 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MapAsyncBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MapAsyncBenchmark.scala @@ -4,24 +4,19 @@ package akka.stream -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import scala.concurrent.Await -import scala.concurrent.Future -import scala.concurrent.duration._ +import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.NotUsed import akka.actor.ActorSystem -import akka.remote.artery.BenchTestSource -import akka.remote.artery.LatchSink -import akka.stream.impl.PhasedFusingActorMaterializer -import akka.stream.impl.StreamSupervisor +import akka.remote.artery.{ BenchTestSource, LatchSink } import akka.stream.scaladsl._ -import akka.testkit.TestProbe +import akka.stream.testkit.scaladsl.StreamTestKit import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + object MapAsyncBenchmark { final val OperationsPerInvocation = 100000 } @@ -95,19 +90,9 @@ class MapAsyncBenchmark { private def awaitLatch(latch: CountDownLatch): Unit = { if (!latch.await(30, TimeUnit.SECONDS)) { - dumpMaterializer() + StreamTestKit.printDebugDump(materializer.supervisor) throw new RuntimeException("Latch didn't complete in time") } } - private def dumpMaterializer(): Unit = { - materializer match { - case impl: PhasedFusingActorMaterializer ⇒ - val probe = TestProbe()(system) - impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) - val children = probe.expectMsgType[StreamSupervisor.Children].children - children.foreach(_ ! StreamSupervisor.PrintDebugDump) - } - } - } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/PartitionHubBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/PartitionHubBenchmark.scala index 61319b7ba3..68be1a7813 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/PartitionHubBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/PartitionHubBenchmark.scala @@ -4,22 +4,18 @@ package akka.stream -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ CountDownLatch, TimeUnit } + import akka.NotUsed import akka.actor.ActorSystem -import akka.stream.scaladsl._ +import akka.remote.artery.{ BenchTestSource, FixedSizePartitionHub, LatchSink } +import akka.stream.scaladsl.{ PartitionHub, _ } +import akka.stream.testkit.scaladsl.StreamTestKit import com.typesafe.config.ConfigFactory import org.openjdk.jmh.annotations._ + import scala.concurrent.Await import scala.concurrent.duration._ -import akka.remote.artery.BenchTestSource -import java.util.concurrent.CountDownLatch -import akka.remote.artery.LatchSink -import akka.stream.impl.PhasedFusingActorMaterializer -import akka.testkit.TestProbe -import akka.stream.impl.StreamSupervisor -import akka.stream.scaladsl.PartitionHub -import akka.remote.artery.FixedSizePartitionHub object PartitionHubBenchmark { final val OperationsPerInvocation = 100000 @@ -112,13 +108,8 @@ class PartitionHubBenchmark { } private def dumpMaterializer(): Unit = { - materializer match { - case impl: PhasedFusingActorMaterializer ⇒ - val probe = TestProbe()(system) - impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) - val children = probe.expectMsgType[StreamSupervisor.Children].children - children.foreach(_ ! StreamSupervisor.PrintDebugDump) - } + implicit val ec = materializer.system.dispatcher + StreamTestKit.printDebugDump(materializer.supervisor) } } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/StreamTestKit.scala index c4f043be46..17912fc335 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/StreamTestKit.scala @@ -6,11 +6,13 @@ package akka.stream.testkit.scaladsl import akka.actor.{ ActorRef, ActorSystem } import akka.annotation.InternalApi -import akka.stream.Materializer +import akka.stream._ import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } +import akka.stream.snapshot._ import akka.testkit.TestProbe import scala.concurrent.duration._ +import scala.concurrent.{ Await, ExecutionContext } object StreamTestKit { @@ -42,21 +44,121 @@ object StreamTestKit { @InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = { val probe = TestProbe()(sys) probe.within(5.seconds) { - var children = Set.empty[ActorRef] try probe.awaitAssert { supervisor.tell(StreamSupervisor.GetChildren, probe.ref) - children = probe.expectMsgType[StreamSupervisor.Children].children + val children = probe.expectMsgType[StreamSupervisor.Children].children assert( children.isEmpty, s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]") - } - catch { + } catch { case ex: Throwable ⇒ - children.foreach(_ ! StreamSupervisor.PrintDebugDump) + import sys.dispatcher + printDebugDump(supervisor) throw ex } } } + /** INTERNAL API */ + @InternalApi private[akka] def printDebugDump(streamSupervisor: ActorRef)(implicit ec: ExecutionContext): Unit = { + val doneDumping = MaterializerState.requestFromSupervisor(streamSupervisor) + .map(snapshots ⇒ + snapshots.foreach(s ⇒ println(snapshotString(s.asInstanceOf[StreamSnapshotImpl])) + )) + Await.result(doneDumping, 5.seconds) + } + + /** INTERNAL API */ + @InternalApi private[testkit] def snapshotString(snapshot: StreamSnapshotImpl): String = { + val builder = StringBuilder.newBuilder + builder.append(s"activeShells (actor: ${snapshot.self}):\n") + snapshot.activeInterpreters.foreach { shell ⇒ + builder.append(" ") + appendShellSnapshot(builder, shell) + builder.append("\n") + appendInterpreterSnapshot(builder, shell.asInstanceOf[RunningInterpreterImpl]) + builder.append("\n") + } + builder.append(s"newShells:\n") + snapshot.newShells.foreach { shell ⇒ + builder.append(" ") + appendShellSnapshot(builder, shell) + builder.append("\n") + builder.append(" Not initialized") + builder.append("\n") + } + builder.toString + } + + private def appendShellSnapshot(builder: StringBuilder, shell: InterpreterSnapshot): Unit = { + builder.append("GraphInterpreterShell(\n logics: [\n") + val logicsToPrint = shell.logics + logicsToPrint.foreach { logic ⇒ + builder.append(" ") + .append(logic.label) + .append(" attrs: [") + .append(logic.attributes.attributeList.mkString(", ")) + .append("],\n") + } + builder.setLength(builder.length - 2) + shell match { + case running: RunningInterpreter ⇒ + builder.append("\n ],\n connections: [\n") + running.connections.foreach { connection ⇒ + builder.append(" ") + .append("Connection(") + .append(connection.asInstanceOf[ConnectionSnapshotImpl].id) + .append(", ") + .append(connection.in.label) + .append(", ") + .append(connection.out.label) + .append(", ") + .append(connection.state) + .append(")\n") + } + builder.setLength(builder.length - 2) + + case _ ⇒ + } + builder.append("\n ]\n)") + builder.toString() + } + + private def appendInterpreterSnapshot(builder: StringBuilder, snapshot: RunningInterpreterImpl): Unit = { + try { + builder.append("\ndot format graph for deadlock analysis:\n") + builder.append("================================================================\n") + builder.append("digraph waits {\n") + + for (i ← snapshot.logics.indices) { + val logic = snapshot.logics(i) + builder.append(s""" N$i [label="${logic.label}"];""").append('\n') + } + + for (connection ← snapshot.connections) { + val inName = "N" + connection.in.asInstanceOf[LogicSnapshotImpl].index + val outName = "N" + connection.out.asInstanceOf[LogicSnapshotImpl].index + + builder.append(s" $inName -> $outName ") + connection.state match { + case ConnectionSnapshot.ShouldPull ⇒ + builder.append("[label=shouldPull, color=blue];") + case ConnectionSnapshot.ShouldPush ⇒ + builder.append(s"[label=shouldPush, color=red];") + case ConnectionSnapshot.Closed ⇒ + builder.append("[style=dotted, label=closed, dir=both];") + case _ ⇒ + } + builder.append("\n") + } + + builder.append("}\n================================================================\n") + builder.append(s"// ${snapshot.queueStatus} (running=${snapshot.runningLogicsCount}, shutdown=${snapshot.stoppedLogics.mkString(",")})") + builder.toString() + } catch { + case _: NoSuchElementException ⇒ builder.append("Not all logics has a stage listed, cannot create graph") + } + } + } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamSpec.scala index 5b9c82939f..369005d2ec 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamSpec.scala @@ -4,11 +4,14 @@ package akka.stream.testkit -import akka.actor.{ ActorSystem, ActorRef } +import akka.actor.{ ActorRef, ActorSystem } import akka.stream.impl.StreamSupervisor +import akka.stream.snapshot.{ MaterializerState, StreamSnapshotImpl } import akka.testkit.{ AkkaSpec, TestProbe } -import com.typesafe.config.{ ConfigFactory, Config } +import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.Failed + +import scala.concurrent.Future import scala.concurrent.duration._ class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) { @@ -26,8 +29,12 @@ class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) { override def withFixture(test: NoArgTest) = { super.withFixture(test) match { case failed: Failed ⇒ + implicit val ec = system.dispatcher val probe = TestProbe()(system) - system.actorSelection("/user/" + StreamSupervisor.baseName + "*").tell(StreamSupervisor.GetChildren, probe.ref) + // FIXME I don't think it always runs under /user anymore (typed) + // FIXME correction - I'm not sure this works at _all_ - supposed to dump stream state if test fails + val streamSupervisors = system.actorSelection("/user/" + StreamSupervisor.baseName + "*") + streamSupervisors.tell(StreamSupervisor.GetChildren, probe.ref) val children: Seq[ActorRef] = probe.receiveWhile(2.seconds) { case StreamSupervisor.Children(children) ⇒ children }.flatten @@ -35,7 +42,11 @@ class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) { if (children.isEmpty) println("Stream is completed. No debug information is available") else { println("Stream actors alive: " + children) - children.foreach(_ ! StreamSupervisor.PrintDebugDump) + Future.sequence(children.map(MaterializerState.requestFromChild)) + .foreach(snapshots ⇒ + snapshots.foreach(s ⇒ + akka.stream.testkit.scaladsl.StreamTestKit.snapshotString(s.asInstanceOf[StreamSnapshotImpl])) + ) } failed case other ⇒ other diff --git a/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala new file mode 100644 index 0000000000..5d9042acc6 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.stream.snapshot + +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.testkit.StreamSpec + +import scala.concurrent.duration._ + +class MaterializerStateSpec extends StreamSpec { + + "The MaterializerSnapshotting" must { + + "snapshot a running stream" in { + implicit val mat = ActorMaterializer() + Source.maybe[Int] + .map(_.toString) + .zipWithIndex + .runWith(Sink.seq) + + awaitAssert({ + val snapshot = MaterializerState.streamSnapshots(mat).futureValue + + snapshot should have size (1) + snapshot.head.activeInterpreters should have size (1) + snapshot.head.activeInterpreters.head.logics should have size (4) // all 4 operators + }, 3.seconds) + } + } + +} diff --git a/akka-stream/src/main/mima-filters/2.5.20.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.20.backwards.excludes index 251f4161a3..ec9109be17 100644 --- a/akka-stream/src/main/mima-filters/2.5.20.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.20.backwards.excludes @@ -5,3 +5,8 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.OutputSt ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$DownstreamStatus") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$Ok$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$Canceled$") + +# AST for stream layout dump +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.StreamSupervisor$PrintDebugDump$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.dumpWaits") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.dumpWaits") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 6b132d5bab..8de2585045 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -168,8 +168,6 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa case object StopChildren /** Testing purpose */ case object StoppedChildren - /** Testing purpose */ - case object PrintDebugDump } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 49270164b6..0ddc37a1dc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -16,6 +16,7 @@ import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ } +import akka.stream.snapshot._ import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import akka.util.OptionVal import org.reactivestreams.{ Publisher, Subscriber, Subscription } @@ -31,6 +32,7 @@ import scala.util.control.NonFatal @InternalApi private[akka] object ActorGraphInterpreter { object Resume extends DeadLetterSuppression with NoSerializationVerificationNeeded + object Snapshot extends NoSerializationVerificationNeeded trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded { def shell: GraphInterpreterShell @@ -512,8 +514,6 @@ import scala.util.control.NonFatal private var inputs: List[BatchingActorInputBoundary] = Nil private var outputs: List[ActorOutputBoundary] = Nil - def dumpWaits(): Unit = interpreter.dumpWaits() - /* * Limits the number of events processed by the interpreter before scheduling * a self-message for fairness with other actors. The basic assumption here is @@ -637,30 +637,15 @@ import scala.util.control.NonFatal } } - override def toString: String = { - val builder = StringBuilder.newBuilder - builder.append("GraphInterpreterShell(\n logics: [\n") - val logicsToPrint = if (isInitialized) interpreter.logics else logics - logicsToPrint.foreach { logic ⇒ - builder.append(" ") - .append(logic.originalStage.getOrElse(logic).toString) - .append(" attrs: [") - .append(logic.attributes.attributeList.mkString(", ")) - .append("],\n") - } - builder.setLength(builder.length - 2) - if (isInitialized) { - builder.append("\n ],\n connections: [\n") - interpreter.connections.foreach { connection ⇒ - builder - .append(" ") - .append(if (connection == null) "null" else connection.toString) - .append(",\n") - } - builder.setLength(builder.length - 2) - } - builder.append("\n ]\n)") - builder.toString() + def toSnapshot: InterpreterSnapshot = { + if (!isInitialized) + UninitializedInterpreterImpl( + logics.zipWithIndex.map { + case (logic, idx) ⇒ + LogicSnapshotImpl(idx, logic.originalStage.getOrElse(logic).toString, logic.attributes) + }.toVector + ) + else interpreter.toSnapshot } } @@ -764,32 +749,12 @@ import scala.util.control.NonFatal currentLimit = eventLimit if (shortCircuitBuffer != null) shortCircuitBatch() - case StreamSupervisor.PrintDebugDump ⇒ - val builder = new java.lang.StringBuilder(s"activeShells (actor: $self):\n") - activeInterpreters.foreach { shell ⇒ - builder.append(" ") - .append(shell.toString.replace("\n", "\n ")) - .append("\n") - if (shell.isInitialized) { - builder.append(shell.interpreter.toString) - } else { - builder.append(" Not initialized") - } - builder.append("\n") - } - builder.append(s"newShells:\n") - newShells.foreach { shell ⇒ - builder.append(" ") - .append(shell.toString.replace("\n", "\n ")) - .append("\n") - if (shell.isInitialized) { - builder.append(shell.interpreter.toString) - } else { - builder.append(" Not initialized") - } - builder.append("\n") - } - println(builder) + case Snapshot ⇒ + sender() ! StreamSnapshotImpl( + self.path, + activeInterpreters.map(shell ⇒ shell.toSnapshot.asInstanceOf[RunningInterpreter]).toSeq, + newShells.map(shell ⇒ shell.toSnapshot.asInstanceOf[UninitializedInterpreter]) + ) } override def postStop(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index dda801bea8..23615fe42a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -15,8 +15,8 @@ import akka.annotation.InternalApi import scala.concurrent.Promise import scala.util.control.NonFatal - import akka.stream.Attributes.LogLevels +import akka.stream.snapshot._ /** * INTERNAL API @@ -85,10 +85,6 @@ import akka.stream.Attributes.LogLevels var outHandler: OutHandler) { var portState: Int = InReady var slot: Any = Empty - - override def toString = - if (GraphInterpreter.Debug) s"Connection($id, $inOwner, $outOwner, $inHandler, $outHandler, $portState, $slot)" - else s"Connection($id, $portState, $slot, $inHandler, $outHandler)" } private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] { @@ -640,49 +636,40 @@ import akka.stream.Attributes.LogLevels } /** - * Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks. - * Use dot/graphviz to render graph. + * Debug utility to dump the "waits-on" relationships in an AST format for rendering in some suitable format for + * analysis of deadlocks. * * Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very * simplistic tool, make sure you are understanding what you are doing and then it will serve you well. */ - def dumpWaits(): Unit = println(toString) + def toSnapshot: RunningInterpreter = { - override def toString: String = { - try { - val builder = new StringBuilder("\ndot format graph for deadlock analysis:\n") - builder.append("================================================================\n") - builder.append("digraph waits {\n") - - for (i ← logics.indices) { - val logic = logics(i) + val logicSnapshots = logics.zipWithIndex.map { + case (logic, idx) ⇒ val label = logic.originalStage.getOrElse(logic).toString - builder.append(s""" N$i [label="$label"];""").append('\n') - } - - val logicIndexes = logics.zipWithIndex.map { case (stage, idx) ⇒ stage → idx }.toMap - for (connection ← connections if connection != null) { - val inName = "N" + logicIndexes(connection.inOwner) - val outName = "N" + logicIndexes(connection.outOwner) - - builder.append(s" $inName -> $outName ") - connection.portState match { - case InReady ⇒ - builder.append("[label=shouldPull, color=blue];") - case OutReady ⇒ - builder.append(s"[label=shouldPush, color=red];") - case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) ⇒ - builder.append("[style=dotted, label=closed, dir=both];") - case _ ⇒ - } - builder.append("\n") - } - - builder.append("}\n================================================================\n") - builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") - builder.toString() - } catch { - case _: NoSuchElementException ⇒ "Not all logics has a stage listed, cannot create graph" + LogicSnapshotImpl(idx, label, logic.attributes) } + val logicIndexes = logics.zipWithIndex.map { case (stage, idx) ⇒ stage → idx }.toMap + val connectionSnapshots = connections.filter(_ != null) + .map { connection ⇒ + ConnectionSnapshotImpl( + connection.id, + logicSnapshots(logicIndexes(connection.inOwner)), + logicSnapshots(logicIndexes(connection.outOwner)), + connection.portState match { + case InReady ⇒ ConnectionSnapshot.ShouldPull + case OutReady ⇒ ConnectionSnapshot.ShouldPush + case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) ⇒ ConnectionSnapshot.Closed + } + ) + } + + RunningInterpreterImpl( + logicSnapshots.toVector, + connectionSnapshots.toVector, + queueStatus, + runningStages, + shutdownCounter.toList.map(n ⇒ logicSnapshots(n))) } + } diff --git a/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala b/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala new file mode 100644 index 0000000000..751a3b2ea2 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala @@ -0,0 +1,201 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.stream.snapshot + +import akka.actor.{ ActorPath, ActorRef } +import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } +import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } +import akka.pattern.ask +import akka.stream.{ Attributes, Materializer } +import akka.stream.impl.fusing.ActorGraphInterpreter +import akka.util.Timeout + +import scala.collection.immutable +import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration._ + +/** + * Debug utility to dump the running streams of a materializers in a structure describing the graph layout + * and "waits-on" relationships. + * + * Some of the data extracted may be off unless the stream has settled, for example in when deadlocked, but the + * structure should be valid regardless. Extracting the information often will have an impact on the performance + * of the running streams. + */ +object MaterializerState { + + /** + * Dump stream snapshots of all streams of the given materializer. + */ + @ApiMayChange + def streamSnapshots(mat: Materializer): Future[immutable.Seq[StreamSnapshot]] = { + mat match { + case impl: PhasedFusingActorMaterializer ⇒ + import impl.system.dispatcher + requestFromSupervisor(impl.supervisor) + } + } + + /** INTERNAL API */ + @InternalApi + private[akka] def requestFromSupervisor(supervisor: ActorRef)(implicit ec: ExecutionContext): Future[immutable.Seq[StreamSnapshot]] = { + // FIXME arbitrary timeout + implicit val timeout: Timeout = 10.seconds + (supervisor ? StreamSupervisor.GetChildren) + .mapTo[StreamSupervisor.Children] + .flatMap(msg ⇒ + Future.sequence(msg.children.toVector.map(requestFromChild)) + ) + } + + /** INTERNAL API */ + @InternalApi + private[akka] def requestFromChild(child: ActorRef)(implicit ec: ExecutionContext): Future[StreamSnapshot] = { + // FIXME arbitrary timeout + implicit val timeout: Timeout = 10.seconds + (child ? ActorGraphInterpreter.Snapshot).mapTo[StreamSnapshot] + } + +} + +/** + * A snapshot of one running stream + * + * Not for user extension + */ +@DoNotInherit @ApiMayChange +sealed trait StreamSnapshot { + /** + * Running interpreters + */ + def activeInterpreters: Seq[RunningInterpreter] + + /** + * Interpreters that has been created but not yet initialized - the stream is not yet running + */ + def newShells: Seq[UninitializedInterpreter] +} + +/** + * A snapshot of one interpreter - contains a set of logics running in the same underlying actor. Note that + * multiple interpreters may be running in the same actor (because of submaterialization) + * + * Not for user extension + */ +@DoNotInherit @ApiMayChange +sealed trait InterpreterSnapshot { + def logics: immutable.Seq[LogicSnapshot] +} + +/** + * A stream interpreter that was not yet initialized when the snapshot was taken + * + * Not for user extension + */ +@DoNotInherit @ApiMayChange +sealed trait UninitializedInterpreter extends InterpreterSnapshot + +/** + * A stream interpreter that is running/has been started + */ +@DoNotInherit @ApiMayChange +sealed trait RunningInterpreter extends InterpreterSnapshot { + /** + * Each of the materialized graph stage logics running inside the interpreter + */ + def logics: immutable.Seq[LogicSnapshot] + /** + * Each connection between logics in the interpreter + */ + def connections: immutable.Seq[ConnectionSnapshot] + + /** + * Total number of non-stopped logics in the interpreter + */ + def runningLogicsCount: Int + + /** + * All logics that has completed and is no longer executing + */ + def stoppedLogics: immutable.Seq[LogicSnapshot] +} + +/** + * + * Not for user extension + */ +@DoNotInherit @ApiMayChange +sealed trait LogicSnapshot { + def label: String + def attributes: Attributes +} + +@ApiMayChange +object ConnectionSnapshot { + sealed trait ConnectionState + case object ShouldPull extends ConnectionState + case object ShouldPush extends ConnectionState + case object Closed extends ConnectionState +} + +/** + * Not for user extension + */ +@DoNotInherit @ApiMayChange +sealed trait ConnectionSnapshot { + def in: LogicSnapshot + def out: LogicSnapshot + def state: ConnectionSnapshot.ConnectionState +} + +/** + * INTERNAL API + */ +@InternalApi +final private[akka] case class StreamSnapshotImpl( + self: ActorPath, + activeInterpreters: Seq[RunningInterpreter], + newShells: Seq[UninitializedInterpreter]) extends StreamSnapshot with HideImpl + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class UninitializedInterpreterImpl(logics: immutable.Seq[LogicSnapshot]) extends UninitializedInterpreter + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class RunningInterpreterImpl( + logics: immutable.Seq[LogicSnapshot], + connections: immutable.Seq[ConnectionSnapshot], + queueStatus: String, + runningLogicsCount: Int, + stoppedLogics: immutable.Seq[LogicSnapshot]) extends RunningInterpreter with HideImpl + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class LogicSnapshotImpl(index: Int, label: String, attributes: Attributes) extends LogicSnapshot with HideImpl + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class ConnectionSnapshotImpl( + id: Int, + in: LogicSnapshot, + out: LogicSnapshot, + state: ConnectionSnapshot.ConnectionState) extends ConnectionSnapshot with HideImpl + +/** + * INTERNAL API + */ +@InternalApi +trait HideImpl { + override def toString: String = super.toString.replaceFirst("Impl", "") +}