From e0d73187bdd2dd8a92518c95f1226f30bc425f3b Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Fri, 22 Jul 2016 04:03:26 -0400 Subject: [PATCH] =str 20967 print stream state on test failed (#21003) --- .../akka/stream/testkit/StreamSpec.scala | 43 +++++++++++++++++++ .../scaladsl/FlowLimitWeightedSpec.scala | 5 +-- .../stream/scaladsl/FlowMapAsyncSpec.scala | 4 +- .../scaladsl/FlowMapAsyncUnorderedSpec.scala | 5 +-- .../stream/scaladsl/FlowMapConcatSpec.scala | 3 +- .../akka/stream/scaladsl/FlowMapSpec.scala | 9 ++-- .../stream/impl/ActorMaterializerImpl.scala | 4 +- 7 files changed, 54 insertions(+), 19 deletions(-) create mode 100644 akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamSpec.scala diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamSpec.scala new file mode 100644 index 0000000000..7ab2d0fd80 --- /dev/null +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamSpec.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.testkit + +import akka.actor.{ ActorSystem, ActorRef } +import akka.stream.impl.StreamSupervisor +import akka.testkit.{ AkkaSpec, TestProbe } +import com.typesafe.config.{ ConfigFactory, Config } +import org.scalatest.Failed +import scala.concurrent.duration._ + +class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) { + def this(config: Config) = + this(ActorSystem( + AkkaSpec.getCallerName(getClass), + ConfigFactory.load(config.withFallback(AkkaSpec.testConf)))) + + def this(s: String) = this(ConfigFactory.parseString(s)) + + def this(configMap: Map[String, _]) = this(AkkaSpec.mapToConfig(configMap)) + + def this() = this(ActorSystem(AkkaSpec.getCallerName(getClass), AkkaSpec.testConf)) + + override def withFixture(test: NoArgTest) = { + super.withFixture(test) match { + case failed: Failed ⇒ + val probe = TestProbe()(system) + system.actorSelection("/user/" + StreamSupervisor.baseName + "*").tell(StreamSupervisor.GetChildren, probe.ref) + val children: Seq[ActorRef] = probe.receiveWhile(2.seconds) { + case StreamSupervisor.Children(children) ⇒ children + }.flatten + println("--- Stream actors debug dump ---") + if (children.isEmpty) println("Stream is completed. No debug information is available") + else { + println("Stream actors alive: " + children) + children.foreach(_ ! StreamSupervisor.PrintDebugDump) + } + failed + case other ⇒ other + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitWeightedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitWeightedSpec.scala index 367ff1a217..37e39290c3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitWeightedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLimitWeightedSpec.scala @@ -3,12 +3,11 @@ */ package akka.stream.scaladsl +import akka.stream.testkit.StreamSpec import akka.stream.{ StreamLimitReachedException, ActorMaterializer, ActorMaterializerSettings } -import akka.testkit.AkkaSpec import scala.concurrent.Await -import scala.concurrent.duration._ -class FlowLimitWeightedSpec extends AkkaSpec { +class FlowLimitWeightedSpec extends StreamSpec { val settings = ActorMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 16) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 2bf6fc708d..1e5dc84e76 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -20,10 +20,8 @@ import scala.annotation.tailrec import scala.concurrent.Promise import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.LinkedBlockingQueue -import org.scalatest.concurrent.ScalaFutures -import akka.testkit.AkkaSpec -class FlowMapAsyncSpec extends AkkaSpec { +class FlowMapAsyncSpec extends StreamSpec { implicit val materializer = ActorMaterializer() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index ef741d59bd..3d1d8955c5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -20,11 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Promise import java.util.concurrent.LinkedBlockingQueue import scala.annotation.tailrec -import org.scalatest.concurrent.ScalaFutures -import org.scalactic.ConversionCheckedTripleEquals -import akka.testkit.AkkaSpec -class FlowMapAsyncUnorderedSpec extends AkkaSpec { +class FlowMapAsyncUnorderedSpec extends StreamSpec { implicit val materializer = ActorMaterializer() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala index 5a7523a0c2..e05b651132 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala @@ -8,9 +8,8 @@ import akka.stream.{ Supervision, ActorAttributes, ActorMaterializer, ActorMater import akka.stream.testkit.Utils._ import akka.stream.testkit._ import scala.util.control.NoStackTrace -import akka.testkit.AkkaSpec -class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { +class FlowMapConcatSpec extends StreamSpec with ScriptedTest { val settings = ActorMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 16) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala index 0f2ab8f7f3..c9e6e908e3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala @@ -4,12 +4,11 @@ package akka.stream.scaladsl import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random } -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings -import akka.stream.testkit._ -import akka.testkit.AkkaSpec -class FlowMapSpec extends AkkaSpec with ScriptedTest { +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit._ + +class FlowMapSpec extends StreamSpec with ScriptedTest { val settings = ActorMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 16) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 8bcda5503c..8c2b997042 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -283,8 +283,8 @@ class FlowNames extends Extension { object StreamSupervisor { def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) - - private val actorName = SeqActorName("StreamSupervisor") + private[stream] val baseName = "StreamSupervisor" + private val actorName = SeqActorName(baseName) def nextName(): String = actorName.next() final case class Materialize(props: Props, name: String)