Extract the layouts of the running streams as an AST (#25831)
This commit is contained in:
parent
1b98ae8601
commit
df089016fa
11 changed files with 433 additions and 167 deletions
|
|
@ -4,23 +4,19 @@
|
||||||
|
|
||||||
package akka.stream
|
package akka.stream
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
import scala.concurrent.Await
|
|
||||||
import scala.concurrent.duration._
|
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.remote.artery.BenchTestSource
|
import akka.remote.artery.{ BenchTestSource, LatchSink }
|
||||||
import akka.remote.artery.LatchSink
|
import akka.stream.impl.fusing.GraphStages
|
||||||
import akka.stream.impl.PhasedFusingActorMaterializer
|
|
||||||
import akka.stream.impl.StreamSupervisor
|
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
import akka.testkit.TestProbe
|
import akka.stream.testkit.scaladsl.StreamTestKit
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import org.openjdk.jmh.annotations._
|
import org.openjdk.jmh.annotations._
|
||||||
import akka.stream.impl.fusing.GraphStages
|
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
object FlatMapConcatBenchmark {
|
object FlatMapConcatBenchmark {
|
||||||
final val OperationsPerInvocation = 100000
|
final val OperationsPerInvocation = 100000
|
||||||
|
|
@ -112,19 +108,10 @@ class FlatMapConcatBenchmark {
|
||||||
|
|
||||||
private def awaitLatch(latch: CountDownLatch): Unit = {
|
private def awaitLatch(latch: CountDownLatch): Unit = {
|
||||||
if (!latch.await(30, TimeUnit.SECONDS)) {
|
if (!latch.await(30, TimeUnit.SECONDS)) {
|
||||||
dumpMaterializer()
|
implicit val ec = materializer.system.dispatcher
|
||||||
|
StreamTestKit.printDebugDump(materializer.supervisor)
|
||||||
throw new RuntimeException("Latch didn't complete in time")
|
throw new RuntimeException("Latch didn't complete in time")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def dumpMaterializer(): Unit = {
|
|
||||||
materializer match {
|
|
||||||
case impl: PhasedFusingActorMaterializer ⇒
|
|
||||||
val probe = TestProbe()(system)
|
|
||||||
impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
|
|
||||||
val children = probe.expectMsgType[StreamSupervisor.Children].children
|
|
||||||
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,24 +4,19 @@
|
||||||
|
|
||||||
package akka.stream
|
package akka.stream
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
import scala.concurrent.Await
|
|
||||||
import scala.concurrent.Future
|
|
||||||
import scala.concurrent.duration._
|
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.remote.artery.BenchTestSource
|
import akka.remote.artery.{ BenchTestSource, LatchSink }
|
||||||
import akka.remote.artery.LatchSink
|
|
||||||
import akka.stream.impl.PhasedFusingActorMaterializer
|
|
||||||
import akka.stream.impl.StreamSupervisor
|
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
import akka.testkit.TestProbe
|
import akka.stream.testkit.scaladsl.StreamTestKit
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import org.openjdk.jmh.annotations._
|
import org.openjdk.jmh.annotations._
|
||||||
|
|
||||||
|
import scala.concurrent.{ Await, Future }
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
object MapAsyncBenchmark {
|
object MapAsyncBenchmark {
|
||||||
final val OperationsPerInvocation = 100000
|
final val OperationsPerInvocation = 100000
|
||||||
}
|
}
|
||||||
|
|
@ -95,19 +90,9 @@ class MapAsyncBenchmark {
|
||||||
|
|
||||||
private def awaitLatch(latch: CountDownLatch): Unit = {
|
private def awaitLatch(latch: CountDownLatch): Unit = {
|
||||||
if (!latch.await(30, TimeUnit.SECONDS)) {
|
if (!latch.await(30, TimeUnit.SECONDS)) {
|
||||||
dumpMaterializer()
|
StreamTestKit.printDebugDump(materializer.supervisor)
|
||||||
throw new RuntimeException("Latch didn't complete in time")
|
throw new RuntimeException("Latch didn't complete in time")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def dumpMaterializer(): Unit = {
|
|
||||||
materializer match {
|
|
||||||
case impl: PhasedFusingActorMaterializer ⇒
|
|
||||||
val probe = TestProbe()(system)
|
|
||||||
impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
|
|
||||||
val children = probe.expectMsgType[StreamSupervisor.Children].children
|
|
||||||
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,22 +4,18 @@
|
||||||
|
|
||||||
package akka.stream
|
package akka.stream
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.stream.scaladsl._
|
import akka.remote.artery.{ BenchTestSource, FixedSizePartitionHub, LatchSink }
|
||||||
|
import akka.stream.scaladsl.{ PartitionHub, _ }
|
||||||
|
import akka.stream.testkit.scaladsl.StreamTestKit
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import org.openjdk.jmh.annotations._
|
import org.openjdk.jmh.annotations._
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.remote.artery.BenchTestSource
|
|
||||||
import java.util.concurrent.CountDownLatch
|
|
||||||
import akka.remote.artery.LatchSink
|
|
||||||
import akka.stream.impl.PhasedFusingActorMaterializer
|
|
||||||
import akka.testkit.TestProbe
|
|
||||||
import akka.stream.impl.StreamSupervisor
|
|
||||||
import akka.stream.scaladsl.PartitionHub
|
|
||||||
import akka.remote.artery.FixedSizePartitionHub
|
|
||||||
|
|
||||||
object PartitionHubBenchmark {
|
object PartitionHubBenchmark {
|
||||||
final val OperationsPerInvocation = 100000
|
final val OperationsPerInvocation = 100000
|
||||||
|
|
@ -112,13 +108,8 @@ class PartitionHubBenchmark {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def dumpMaterializer(): Unit = {
|
private def dumpMaterializer(): Unit = {
|
||||||
materializer match {
|
implicit val ec = materializer.system.dispatcher
|
||||||
case impl: PhasedFusingActorMaterializer ⇒
|
StreamTestKit.printDebugDump(materializer.supervisor)
|
||||||
val probe = TestProbe()(system)
|
|
||||||
impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
|
|
||||||
val children = probe.expectMsgType[StreamSupervisor.Children].children
|
|
||||||
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,11 +6,13 @@ package akka.stream.testkit.scaladsl
|
||||||
|
|
||||||
import akka.actor.{ ActorRef, ActorSystem }
|
import akka.actor.{ ActorRef, ActorSystem }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.stream.Materializer
|
import akka.stream._
|
||||||
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
|
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
|
||||||
|
import akka.stream.snapshot._
|
||||||
import akka.testkit.TestProbe
|
import akka.testkit.TestProbe
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
import scala.concurrent.{ Await, ExecutionContext }
|
||||||
|
|
||||||
object StreamTestKit {
|
object StreamTestKit {
|
||||||
|
|
||||||
|
|
@ -42,21 +44,121 @@ object StreamTestKit {
|
||||||
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = {
|
@InternalApi private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit = {
|
||||||
val probe = TestProbe()(sys)
|
val probe = TestProbe()(sys)
|
||||||
probe.within(5.seconds) {
|
probe.within(5.seconds) {
|
||||||
var children = Set.empty[ActorRef]
|
|
||||||
try probe.awaitAssert {
|
try probe.awaitAssert {
|
||||||
supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
|
supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
|
||||||
children = probe.expectMsgType[StreamSupervisor.Children].children
|
val children = probe.expectMsgType[StreamSupervisor.Children].children
|
||||||
assert(
|
assert(
|
||||||
children.isEmpty,
|
children.isEmpty,
|
||||||
s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]")
|
s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]")
|
||||||
}
|
} catch {
|
||||||
catch {
|
|
||||||
case ex: Throwable ⇒
|
case ex: Throwable ⇒
|
||||||
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
|
import sys.dispatcher
|
||||||
|
printDebugDump(supervisor)
|
||||||
throw ex
|
throw ex
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** INTERNAL API */
|
||||||
|
@InternalApi private[akka] def printDebugDump(streamSupervisor: ActorRef)(implicit ec: ExecutionContext): Unit = {
|
||||||
|
val doneDumping = MaterializerState.requestFromSupervisor(streamSupervisor)
|
||||||
|
.map(snapshots ⇒
|
||||||
|
snapshots.foreach(s ⇒ println(snapshotString(s.asInstanceOf[StreamSnapshotImpl]))
|
||||||
|
))
|
||||||
|
Await.result(doneDumping, 5.seconds)
|
||||||
|
}
|
||||||
|
|
||||||
|
/** INTERNAL API */
|
||||||
|
@InternalApi private[testkit] def snapshotString(snapshot: StreamSnapshotImpl): String = {
|
||||||
|
val builder = StringBuilder.newBuilder
|
||||||
|
builder.append(s"activeShells (actor: ${snapshot.self}):\n")
|
||||||
|
snapshot.activeInterpreters.foreach { shell ⇒
|
||||||
|
builder.append(" ")
|
||||||
|
appendShellSnapshot(builder, shell)
|
||||||
|
builder.append("\n")
|
||||||
|
appendInterpreterSnapshot(builder, shell.asInstanceOf[RunningInterpreterImpl])
|
||||||
|
builder.append("\n")
|
||||||
|
}
|
||||||
|
builder.append(s"newShells:\n")
|
||||||
|
snapshot.newShells.foreach { shell ⇒
|
||||||
|
builder.append(" ")
|
||||||
|
appendShellSnapshot(builder, shell)
|
||||||
|
builder.append("\n")
|
||||||
|
builder.append(" Not initialized")
|
||||||
|
builder.append("\n")
|
||||||
|
}
|
||||||
|
builder.toString
|
||||||
|
}
|
||||||
|
|
||||||
|
private def appendShellSnapshot(builder: StringBuilder, shell: InterpreterSnapshot): Unit = {
|
||||||
|
builder.append("GraphInterpreterShell(\n logics: [\n")
|
||||||
|
val logicsToPrint = shell.logics
|
||||||
|
logicsToPrint.foreach { logic ⇒
|
||||||
|
builder.append(" ")
|
||||||
|
.append(logic.label)
|
||||||
|
.append(" attrs: [")
|
||||||
|
.append(logic.attributes.attributeList.mkString(", "))
|
||||||
|
.append("],\n")
|
||||||
|
}
|
||||||
|
builder.setLength(builder.length - 2)
|
||||||
|
shell match {
|
||||||
|
case running: RunningInterpreter ⇒
|
||||||
|
builder.append("\n ],\n connections: [\n")
|
||||||
|
running.connections.foreach { connection ⇒
|
||||||
|
builder.append(" ")
|
||||||
|
.append("Connection(")
|
||||||
|
.append(connection.asInstanceOf[ConnectionSnapshotImpl].id)
|
||||||
|
.append(", ")
|
||||||
|
.append(connection.in.label)
|
||||||
|
.append(", ")
|
||||||
|
.append(connection.out.label)
|
||||||
|
.append(", ")
|
||||||
|
.append(connection.state)
|
||||||
|
.append(")\n")
|
||||||
|
}
|
||||||
|
builder.setLength(builder.length - 2)
|
||||||
|
|
||||||
|
case _ ⇒
|
||||||
|
}
|
||||||
|
builder.append("\n ]\n)")
|
||||||
|
builder.toString()
|
||||||
|
}
|
||||||
|
|
||||||
|
private def appendInterpreterSnapshot(builder: StringBuilder, snapshot: RunningInterpreterImpl): Unit = {
|
||||||
|
try {
|
||||||
|
builder.append("\ndot format graph for deadlock analysis:\n")
|
||||||
|
builder.append("================================================================\n")
|
||||||
|
builder.append("digraph waits {\n")
|
||||||
|
|
||||||
|
for (i ← snapshot.logics.indices) {
|
||||||
|
val logic = snapshot.logics(i)
|
||||||
|
builder.append(s""" N$i [label="${logic.label}"];""").append('\n')
|
||||||
|
}
|
||||||
|
|
||||||
|
for (connection ← snapshot.connections) {
|
||||||
|
val inName = "N" + connection.in.asInstanceOf[LogicSnapshotImpl].index
|
||||||
|
val outName = "N" + connection.out.asInstanceOf[LogicSnapshotImpl].index
|
||||||
|
|
||||||
|
builder.append(s" $inName -> $outName ")
|
||||||
|
connection.state match {
|
||||||
|
case ConnectionSnapshot.ShouldPull ⇒
|
||||||
|
builder.append("[label=shouldPull, color=blue];")
|
||||||
|
case ConnectionSnapshot.ShouldPush ⇒
|
||||||
|
builder.append(s"[label=shouldPush, color=red];")
|
||||||
|
case ConnectionSnapshot.Closed ⇒
|
||||||
|
builder.append("[style=dotted, label=closed, dir=both];")
|
||||||
|
case _ ⇒
|
||||||
|
}
|
||||||
|
builder.append("\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.append("}\n================================================================\n")
|
||||||
|
builder.append(s"// ${snapshot.queueStatus} (running=${snapshot.runningLogicsCount}, shutdown=${snapshot.stoppedLogics.mkString(",")})")
|
||||||
|
builder.toString()
|
||||||
|
} catch {
|
||||||
|
case _: NoSuchElementException ⇒ builder.append("Not all logics has a stage listed, cannot create graph")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,11 +4,14 @@
|
||||||
|
|
||||||
package akka.stream.testkit
|
package akka.stream.testkit
|
||||||
|
|
||||||
import akka.actor.{ ActorSystem, ActorRef }
|
import akka.actor.{ ActorRef, ActorSystem }
|
||||||
import akka.stream.impl.StreamSupervisor
|
import akka.stream.impl.StreamSupervisor
|
||||||
|
import akka.stream.snapshot.{ MaterializerState, StreamSnapshotImpl }
|
||||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||||
import com.typesafe.config.{ ConfigFactory, Config }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
import org.scalatest.Failed
|
import org.scalatest.Failed
|
||||||
|
|
||||||
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) {
|
class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) {
|
||||||
|
|
@ -26,8 +29,12 @@ class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) {
|
||||||
override def withFixture(test: NoArgTest) = {
|
override def withFixture(test: NoArgTest) = {
|
||||||
super.withFixture(test) match {
|
super.withFixture(test) match {
|
||||||
case failed: Failed ⇒
|
case failed: Failed ⇒
|
||||||
|
implicit val ec = system.dispatcher
|
||||||
val probe = TestProbe()(system)
|
val probe = TestProbe()(system)
|
||||||
system.actorSelection("/user/" + StreamSupervisor.baseName + "*").tell(StreamSupervisor.GetChildren, probe.ref)
|
// FIXME I don't think it always runs under /user anymore (typed)
|
||||||
|
// FIXME correction - I'm not sure this works at _all_ - supposed to dump stream state if test fails
|
||||||
|
val streamSupervisors = system.actorSelection("/user/" + StreamSupervisor.baseName + "*")
|
||||||
|
streamSupervisors.tell(StreamSupervisor.GetChildren, probe.ref)
|
||||||
val children: Seq[ActorRef] = probe.receiveWhile(2.seconds) {
|
val children: Seq[ActorRef] = probe.receiveWhile(2.seconds) {
|
||||||
case StreamSupervisor.Children(children) ⇒ children
|
case StreamSupervisor.Children(children) ⇒ children
|
||||||
}.flatten
|
}.flatten
|
||||||
|
|
@ -35,7 +42,11 @@ class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) {
|
||||||
if (children.isEmpty) println("Stream is completed. No debug information is available")
|
if (children.isEmpty) println("Stream is completed. No debug information is available")
|
||||||
else {
|
else {
|
||||||
println("Stream actors alive: " + children)
|
println("Stream actors alive: " + children)
|
||||||
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
|
Future.sequence(children.map(MaterializerState.requestFromChild))
|
||||||
|
.foreach(snapshots ⇒
|
||||||
|
snapshots.foreach(s ⇒
|
||||||
|
akka.stream.testkit.scaladsl.StreamTestKit.snapshotString(s.asInstanceOf[StreamSnapshotImpl]))
|
||||||
|
)
|
||||||
}
|
}
|
||||||
failed
|
failed
|
||||||
case other ⇒ other
|
case other ⇒ other
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.snapshot
|
||||||
|
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.stream.scaladsl.{ Sink, Source }
|
||||||
|
import akka.stream.testkit.StreamSpec
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
class MaterializerStateSpec extends StreamSpec {
|
||||||
|
|
||||||
|
"The MaterializerSnapshotting" must {
|
||||||
|
|
||||||
|
"snapshot a running stream" in {
|
||||||
|
implicit val mat = ActorMaterializer()
|
||||||
|
Source.maybe[Int]
|
||||||
|
.map(_.toString)
|
||||||
|
.zipWithIndex
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
|
||||||
|
awaitAssert({
|
||||||
|
val snapshot = MaterializerState.streamSnapshots(mat).futureValue
|
||||||
|
|
||||||
|
snapshot should have size (1)
|
||||||
|
snapshot.head.activeInterpreters should have size (1)
|
||||||
|
snapshot.head.activeInterpreters.head.logics should have size (4) // all 4 operators
|
||||||
|
}, 3.seconds)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -5,3 +5,8 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.OutputSt
|
||||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$DownstreamStatus")
|
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$DownstreamStatus")
|
||||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$Ok$")
|
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$Ok$")
|
||||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$Canceled$")
|
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.OutputStreamSourceStage$Canceled$")
|
||||||
|
|
||||||
|
# AST for stream layout dump
|
||||||
|
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.StreamSupervisor$PrintDebugDump$")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.dumpWaits")
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.dumpWaits")
|
||||||
|
|
|
||||||
|
|
@ -168,8 +168,6 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
|
||||||
case object StopChildren
|
case object StopChildren
|
||||||
/** Testing purpose */
|
/** Testing purpose */
|
||||||
case object StoppedChildren
|
case object StoppedChildren
|
||||||
/** Testing purpose */
|
|
||||||
case object PrintDebugDump
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ import akka.stream._
|
||||||
import akka.stream.impl.ReactiveStreamsCompliance._
|
import akka.stream.impl.ReactiveStreamsCompliance._
|
||||||
import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
|
import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
|
||||||
import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ }
|
import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ }
|
||||||
|
import akka.stream.snapshot._
|
||||||
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
|
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
|
||||||
import akka.util.OptionVal
|
import akka.util.OptionVal
|
||||||
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
||||||
|
|
@ -31,6 +32,7 @@ import scala.util.control.NonFatal
|
||||||
@InternalApi private[akka] object ActorGraphInterpreter {
|
@InternalApi private[akka] object ActorGraphInterpreter {
|
||||||
|
|
||||||
object Resume extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
object Resume extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||||
|
object Snapshot extends NoSerializationVerificationNeeded
|
||||||
|
|
||||||
trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded {
|
trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded {
|
||||||
def shell: GraphInterpreterShell
|
def shell: GraphInterpreterShell
|
||||||
|
|
@ -512,8 +514,6 @@ import scala.util.control.NonFatal
|
||||||
private var inputs: List[BatchingActorInputBoundary] = Nil
|
private var inputs: List[BatchingActorInputBoundary] = Nil
|
||||||
private var outputs: List[ActorOutputBoundary] = Nil
|
private var outputs: List[ActorOutputBoundary] = Nil
|
||||||
|
|
||||||
def dumpWaits(): Unit = interpreter.dumpWaits()
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Limits the number of events processed by the interpreter before scheduling
|
* Limits the number of events processed by the interpreter before scheduling
|
||||||
* a self-message for fairness with other actors. The basic assumption here is
|
* a self-message for fairness with other actors. The basic assumption here is
|
||||||
|
|
@ -637,30 +637,15 @@ import scala.util.control.NonFatal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString: String = {
|
def toSnapshot: InterpreterSnapshot = {
|
||||||
val builder = StringBuilder.newBuilder
|
if (!isInitialized)
|
||||||
builder.append("GraphInterpreterShell(\n logics: [\n")
|
UninitializedInterpreterImpl(
|
||||||
val logicsToPrint = if (isInitialized) interpreter.logics else logics
|
logics.zipWithIndex.map {
|
||||||
logicsToPrint.foreach { logic ⇒
|
case (logic, idx) ⇒
|
||||||
builder.append(" ")
|
LogicSnapshotImpl(idx, logic.originalStage.getOrElse(logic).toString, logic.attributes)
|
||||||
.append(logic.originalStage.getOrElse(logic).toString)
|
}.toVector
|
||||||
.append(" attrs: [")
|
)
|
||||||
.append(logic.attributes.attributeList.mkString(", "))
|
else interpreter.toSnapshot
|
||||||
.append("],\n")
|
|
||||||
}
|
|
||||||
builder.setLength(builder.length - 2)
|
|
||||||
if (isInitialized) {
|
|
||||||
builder.append("\n ],\n connections: [\n")
|
|
||||||
interpreter.connections.foreach { connection ⇒
|
|
||||||
builder
|
|
||||||
.append(" ")
|
|
||||||
.append(if (connection == null) "null" else connection.toString)
|
|
||||||
.append(",\n")
|
|
||||||
}
|
|
||||||
builder.setLength(builder.length - 2)
|
|
||||||
}
|
|
||||||
builder.append("\n ]\n)")
|
|
||||||
builder.toString()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -764,32 +749,12 @@ import scala.util.control.NonFatal
|
||||||
currentLimit = eventLimit
|
currentLimit = eventLimit
|
||||||
if (shortCircuitBuffer != null) shortCircuitBatch()
|
if (shortCircuitBuffer != null) shortCircuitBatch()
|
||||||
|
|
||||||
case StreamSupervisor.PrintDebugDump ⇒
|
case Snapshot ⇒
|
||||||
val builder = new java.lang.StringBuilder(s"activeShells (actor: $self):\n")
|
sender() ! StreamSnapshotImpl(
|
||||||
activeInterpreters.foreach { shell ⇒
|
self.path,
|
||||||
builder.append(" ")
|
activeInterpreters.map(shell ⇒ shell.toSnapshot.asInstanceOf[RunningInterpreter]).toSeq,
|
||||||
.append(shell.toString.replace("\n", "\n "))
|
newShells.map(shell ⇒ shell.toSnapshot.asInstanceOf[UninitializedInterpreter])
|
||||||
.append("\n")
|
)
|
||||||
if (shell.isInitialized) {
|
|
||||||
builder.append(shell.interpreter.toString)
|
|
||||||
} else {
|
|
||||||
builder.append(" Not initialized")
|
|
||||||
}
|
|
||||||
builder.append("\n")
|
|
||||||
}
|
|
||||||
builder.append(s"newShells:\n")
|
|
||||||
newShells.foreach { shell ⇒
|
|
||||||
builder.append(" ")
|
|
||||||
.append(shell.toString.replace("\n", "\n "))
|
|
||||||
.append("\n")
|
|
||||||
if (shell.isInitialized) {
|
|
||||||
builder.append(shell.interpreter.toString)
|
|
||||||
} else {
|
|
||||||
builder.append(" Not initialized")
|
|
||||||
}
|
|
||||||
builder.append("\n")
|
|
||||||
}
|
|
||||||
println(builder)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def postStop(): Unit = {
|
override def postStop(): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -15,8 +15,8 @@ import akka.annotation.InternalApi
|
||||||
|
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
import akka.stream.Attributes.LogLevels
|
import akka.stream.Attributes.LogLevels
|
||||||
|
import akka.stream.snapshot._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -85,10 +85,6 @@ import akka.stream.Attributes.LogLevels
|
||||||
var outHandler: OutHandler) {
|
var outHandler: OutHandler) {
|
||||||
var portState: Int = InReady
|
var portState: Int = InReady
|
||||||
var slot: Any = Empty
|
var slot: Any = Empty
|
||||||
|
|
||||||
override def toString =
|
|
||||||
if (GraphInterpreter.Debug) s"Connection($id, $inOwner, $outOwner, $inHandler, $outHandler, $portState, $slot)"
|
|
||||||
else s"Connection($id, $portState, $slot, $inHandler, $outHandler)"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] {
|
private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] {
|
||||||
|
|
@ -640,49 +636,40 @@ import akka.stream.Attributes.LogLevels
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks.
|
* Debug utility to dump the "waits-on" relationships in an AST format for rendering in some suitable format for
|
||||||
* Use dot/graphviz to render graph.
|
* analysis of deadlocks.
|
||||||
*
|
*
|
||||||
* Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very
|
* Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very
|
||||||
* simplistic tool, make sure you are understanding what you are doing and then it will serve you well.
|
* simplistic tool, make sure you are understanding what you are doing and then it will serve you well.
|
||||||
*/
|
*/
|
||||||
def dumpWaits(): Unit = println(toString)
|
def toSnapshot: RunningInterpreter = {
|
||||||
|
|
||||||
override def toString: String = {
|
val logicSnapshots = logics.zipWithIndex.map {
|
||||||
try {
|
case (logic, idx) ⇒
|
||||||
val builder = new StringBuilder("\ndot format graph for deadlock analysis:\n")
|
|
||||||
builder.append("================================================================\n")
|
|
||||||
builder.append("digraph waits {\n")
|
|
||||||
|
|
||||||
for (i ← logics.indices) {
|
|
||||||
val logic = logics(i)
|
|
||||||
val label = logic.originalStage.getOrElse(logic).toString
|
val label = logic.originalStage.getOrElse(logic).toString
|
||||||
builder.append(s""" N$i [label="$label"];""").append('\n')
|
LogicSnapshotImpl(idx, label, logic.attributes)
|
||||||
}
|
|
||||||
|
|
||||||
val logicIndexes = logics.zipWithIndex.map { case (stage, idx) ⇒ stage → idx }.toMap
|
|
||||||
for (connection ← connections if connection != null) {
|
|
||||||
val inName = "N" + logicIndexes(connection.inOwner)
|
|
||||||
val outName = "N" + logicIndexes(connection.outOwner)
|
|
||||||
|
|
||||||
builder.append(s" $inName -> $outName ")
|
|
||||||
connection.portState match {
|
|
||||||
case InReady ⇒
|
|
||||||
builder.append("[label=shouldPull, color=blue];")
|
|
||||||
case OutReady ⇒
|
|
||||||
builder.append(s"[label=shouldPush, color=red];")
|
|
||||||
case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) ⇒
|
|
||||||
builder.append("[style=dotted, label=closed, dir=both];")
|
|
||||||
case _ ⇒
|
|
||||||
}
|
|
||||||
builder.append("\n")
|
|
||||||
}
|
|
||||||
|
|
||||||
builder.append("}\n================================================================\n")
|
|
||||||
builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})")
|
|
||||||
builder.toString()
|
|
||||||
} catch {
|
|
||||||
case _: NoSuchElementException ⇒ "Not all logics has a stage listed, cannot create graph"
|
|
||||||
}
|
}
|
||||||
|
val logicIndexes = logics.zipWithIndex.map { case (stage, idx) ⇒ stage → idx }.toMap
|
||||||
|
val connectionSnapshots = connections.filter(_ != null)
|
||||||
|
.map { connection ⇒
|
||||||
|
ConnectionSnapshotImpl(
|
||||||
|
connection.id,
|
||||||
|
logicSnapshots(logicIndexes(connection.inOwner)),
|
||||||
|
logicSnapshots(logicIndexes(connection.outOwner)),
|
||||||
|
connection.portState match {
|
||||||
|
case InReady ⇒ ConnectionSnapshot.ShouldPull
|
||||||
|
case OutReady ⇒ ConnectionSnapshot.ShouldPush
|
||||||
|
case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) ⇒ ConnectionSnapshot.Closed
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
RunningInterpreterImpl(
|
||||||
|
logicSnapshots.toVector,
|
||||||
|
connectionSnapshots.toVector,
|
||||||
|
queueStatus,
|
||||||
|
runningStages,
|
||||||
|
shutdownCounter.toList.map(n ⇒ logicSnapshots(n)))
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,201 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.stream.snapshot
|
||||||
|
|
||||||
|
import akka.actor.{ ActorPath, ActorRef }
|
||||||
|
import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi }
|
||||||
|
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
|
||||||
|
import akka.pattern.ask
|
||||||
|
import akka.stream.{ Attributes, Materializer }
|
||||||
|
import akka.stream.impl.fusing.ActorGraphInterpreter
|
||||||
|
import akka.util.Timeout
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
import scala.concurrent.{ ExecutionContext, Future }
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Debug utility to dump the running streams of a materializers in a structure describing the graph layout
|
||||||
|
* and "waits-on" relationships.
|
||||||
|
*
|
||||||
|
* Some of the data extracted may be off unless the stream has settled, for example in when deadlocked, but the
|
||||||
|
* structure should be valid regardless. Extracting the information often will have an impact on the performance
|
||||||
|
* of the running streams.
|
||||||
|
*/
|
||||||
|
object MaterializerState {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dump stream snapshots of all streams of the given materializer.
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
def streamSnapshots(mat: Materializer): Future[immutable.Seq[StreamSnapshot]] = {
|
||||||
|
mat match {
|
||||||
|
case impl: PhasedFusingActorMaterializer ⇒
|
||||||
|
import impl.system.dispatcher
|
||||||
|
requestFromSupervisor(impl.supervisor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** INTERNAL API */
|
||||||
|
@InternalApi
|
||||||
|
private[akka] def requestFromSupervisor(supervisor: ActorRef)(implicit ec: ExecutionContext): Future[immutable.Seq[StreamSnapshot]] = {
|
||||||
|
// FIXME arbitrary timeout
|
||||||
|
implicit val timeout: Timeout = 10.seconds
|
||||||
|
(supervisor ? StreamSupervisor.GetChildren)
|
||||||
|
.mapTo[StreamSupervisor.Children]
|
||||||
|
.flatMap(msg ⇒
|
||||||
|
Future.sequence(msg.children.toVector.map(requestFromChild))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/** INTERNAL API */
|
||||||
|
@InternalApi
|
||||||
|
private[akka] def requestFromChild(child: ActorRef)(implicit ec: ExecutionContext): Future[StreamSnapshot] = {
|
||||||
|
// FIXME arbitrary timeout
|
||||||
|
implicit val timeout: Timeout = 10.seconds
|
||||||
|
(child ? ActorGraphInterpreter.Snapshot).mapTo[StreamSnapshot]
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A snapshot of one running stream
|
||||||
|
*
|
||||||
|
* Not for user extension
|
||||||
|
*/
|
||||||
|
@DoNotInherit @ApiMayChange
|
||||||
|
sealed trait StreamSnapshot {
|
||||||
|
/**
|
||||||
|
* Running interpreters
|
||||||
|
*/
|
||||||
|
def activeInterpreters: Seq[RunningInterpreter]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interpreters that has been created but not yet initialized - the stream is not yet running
|
||||||
|
*/
|
||||||
|
def newShells: Seq[UninitializedInterpreter]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A snapshot of one interpreter - contains a set of logics running in the same underlying actor. Note that
|
||||||
|
* multiple interpreters may be running in the same actor (because of submaterialization)
|
||||||
|
*
|
||||||
|
* Not for user extension
|
||||||
|
*/
|
||||||
|
@DoNotInherit @ApiMayChange
|
||||||
|
sealed trait InterpreterSnapshot {
|
||||||
|
def logics: immutable.Seq[LogicSnapshot]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A stream interpreter that was not yet initialized when the snapshot was taken
|
||||||
|
*
|
||||||
|
* Not for user extension
|
||||||
|
*/
|
||||||
|
@DoNotInherit @ApiMayChange
|
||||||
|
sealed trait UninitializedInterpreter extends InterpreterSnapshot
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A stream interpreter that is running/has been started
|
||||||
|
*/
|
||||||
|
@DoNotInherit @ApiMayChange
|
||||||
|
sealed trait RunningInterpreter extends InterpreterSnapshot {
|
||||||
|
/**
|
||||||
|
* Each of the materialized graph stage logics running inside the interpreter
|
||||||
|
*/
|
||||||
|
def logics: immutable.Seq[LogicSnapshot]
|
||||||
|
/**
|
||||||
|
* Each connection between logics in the interpreter
|
||||||
|
*/
|
||||||
|
def connections: immutable.Seq[ConnectionSnapshot]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total number of non-stopped logics in the interpreter
|
||||||
|
*/
|
||||||
|
def runningLogicsCount: Int
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All logics that has completed and is no longer executing
|
||||||
|
*/
|
||||||
|
def stoppedLogics: immutable.Seq[LogicSnapshot]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Not for user extension
|
||||||
|
*/
|
||||||
|
@DoNotInherit @ApiMayChange
|
||||||
|
sealed trait LogicSnapshot {
|
||||||
|
def label: String
|
||||||
|
def attributes: Attributes
|
||||||
|
}
|
||||||
|
|
||||||
|
@ApiMayChange
|
||||||
|
object ConnectionSnapshot {
|
||||||
|
sealed trait ConnectionState
|
||||||
|
case object ShouldPull extends ConnectionState
|
||||||
|
case object ShouldPush extends ConnectionState
|
||||||
|
case object Closed extends ConnectionState
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Not for user extension
|
||||||
|
*/
|
||||||
|
@DoNotInherit @ApiMayChange
|
||||||
|
sealed trait ConnectionSnapshot {
|
||||||
|
def in: LogicSnapshot
|
||||||
|
def out: LogicSnapshot
|
||||||
|
def state: ConnectionSnapshot.ConnectionState
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
final private[akka] case class StreamSnapshotImpl(
|
||||||
|
self: ActorPath,
|
||||||
|
activeInterpreters: Seq[RunningInterpreter],
|
||||||
|
newShells: Seq[UninitializedInterpreter]) extends StreamSnapshot with HideImpl
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
private[akka] final case class UninitializedInterpreterImpl(logics: immutable.Seq[LogicSnapshot]) extends UninitializedInterpreter
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
private[akka] final case class RunningInterpreterImpl(
|
||||||
|
logics: immutable.Seq[LogicSnapshot],
|
||||||
|
connections: immutable.Seq[ConnectionSnapshot],
|
||||||
|
queueStatus: String,
|
||||||
|
runningLogicsCount: Int,
|
||||||
|
stoppedLogics: immutable.Seq[LogicSnapshot]) extends RunningInterpreter with HideImpl
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
private[akka] final case class LogicSnapshotImpl(index: Int, label: String, attributes: Attributes) extends LogicSnapshot with HideImpl
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
private[akka] final case class ConnectionSnapshotImpl(
|
||||||
|
id: Int,
|
||||||
|
in: LogicSnapshot,
|
||||||
|
out: LogicSnapshot,
|
||||||
|
state: ConnectionSnapshot.ConnectionState) extends ConnectionSnapshot with HideImpl
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
trait HideImpl {
|
||||||
|
override def toString: String = super.toString.replaceFirst("Impl", "")
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue