=str 20967 print stream state on test failed (#21003)
This commit is contained in:
parent
14bfc353ba
commit
e0d73187bd
7 changed files with 54 additions and 19 deletions
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.stream.testkit
|
||||||
|
|
||||||
|
import akka.actor.{ ActorSystem, ActorRef }
|
||||||
|
import akka.stream.impl.StreamSupervisor
|
||||||
|
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||||
|
import com.typesafe.config.{ ConfigFactory, Config }
|
||||||
|
import org.scalatest.Failed
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
class StreamSpec(_system: ActorSystem) extends AkkaSpec(_system) {
|
||||||
|
def this(config: Config) =
|
||||||
|
this(ActorSystem(
|
||||||
|
AkkaSpec.getCallerName(getClass),
|
||||||
|
ConfigFactory.load(config.withFallback(AkkaSpec.testConf))))
|
||||||
|
|
||||||
|
def this(s: String) = this(ConfigFactory.parseString(s))
|
||||||
|
|
||||||
|
def this(configMap: Map[String, _]) = this(AkkaSpec.mapToConfig(configMap))
|
||||||
|
|
||||||
|
def this() = this(ActorSystem(AkkaSpec.getCallerName(getClass), AkkaSpec.testConf))
|
||||||
|
|
||||||
|
override def withFixture(test: NoArgTest) = {
|
||||||
|
super.withFixture(test) match {
|
||||||
|
case failed: Failed ⇒
|
||||||
|
val probe = TestProbe()(system)
|
||||||
|
system.actorSelection("/user/" + StreamSupervisor.baseName + "*").tell(StreamSupervisor.GetChildren, probe.ref)
|
||||||
|
val children: Seq[ActorRef] = probe.receiveWhile(2.seconds) {
|
||||||
|
case StreamSupervisor.Children(children) ⇒ children
|
||||||
|
}.flatten
|
||||||
|
println("--- Stream actors debug dump ---")
|
||||||
|
if (children.isEmpty) println("Stream is completed. No debug information is available")
|
||||||
|
else {
|
||||||
|
println("Stream actors alive: " + children)
|
||||||
|
children.foreach(_ ! StreamSupervisor.PrintDebugDump)
|
||||||
|
}
|
||||||
|
failed
|
||||||
|
case other ⇒ other
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,12 +3,11 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.scaladsl
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import akka.stream.testkit.StreamSpec
|
||||||
import akka.stream.{ StreamLimitReachedException, ActorMaterializer, ActorMaterializerSettings }
|
import akka.stream.{ StreamLimitReachedException, ActorMaterializer, ActorMaterializerSettings }
|
||||||
import akka.testkit.AkkaSpec
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
|
||||||
|
|
||||||
class FlowLimitWeightedSpec extends AkkaSpec {
|
class FlowLimitWeightedSpec extends StreamSpec {
|
||||||
|
|
||||||
val settings = ActorMaterializerSettings(system)
|
val settings = ActorMaterializerSettings(system)
|
||||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||||
|
|
|
||||||
|
|
@ -20,10 +20,8 @@ import scala.annotation.tailrec
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
import java.util.concurrent.LinkedBlockingQueue
|
||||||
import org.scalatest.concurrent.ScalaFutures
|
|
||||||
import akka.testkit.AkkaSpec
|
|
||||||
|
|
||||||
class FlowMapAsyncSpec extends AkkaSpec {
|
class FlowMapAsyncSpec extends StreamSpec {
|
||||||
|
|
||||||
implicit val materializer = ActorMaterializer()
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,11 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
import java.util.concurrent.LinkedBlockingQueue
|
import java.util.concurrent.LinkedBlockingQueue
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import org.scalatest.concurrent.ScalaFutures
|
|
||||||
import org.scalactic.ConversionCheckedTripleEquals
|
|
||||||
import akka.testkit.AkkaSpec
|
|
||||||
|
|
||||||
class FlowMapAsyncUnorderedSpec extends AkkaSpec {
|
class FlowMapAsyncUnorderedSpec extends StreamSpec {
|
||||||
|
|
||||||
implicit val materializer = ActorMaterializer()
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,8 @@ import akka.stream.{ Supervision, ActorAttributes, ActorMaterializer, ActorMater
|
||||||
import akka.stream.testkit.Utils._
|
import akka.stream.testkit.Utils._
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
import akka.testkit.AkkaSpec
|
|
||||||
|
|
||||||
class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
|
class FlowMapConcatSpec extends StreamSpec with ScriptedTest {
|
||||||
|
|
||||||
val settings = ActorMaterializerSettings(system)
|
val settings = ActorMaterializerSettings(system)
|
||||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||||
|
|
|
||||||
|
|
@ -4,12 +4,11 @@
|
||||||
package akka.stream.scaladsl
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random }
|
import java.util.concurrent.ThreadLocalRandom.{ current ⇒ random }
|
||||||
import akka.stream.ActorMaterializer
|
|
||||||
import akka.stream.ActorMaterializerSettings
|
|
||||||
import akka.stream.testkit._
|
|
||||||
import akka.testkit.AkkaSpec
|
|
||||||
|
|
||||||
class FlowMapSpec extends AkkaSpec with ScriptedTest {
|
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
||||||
|
import akka.stream.testkit._
|
||||||
|
|
||||||
|
class FlowMapSpec extends StreamSpec with ScriptedTest {
|
||||||
|
|
||||||
val settings = ActorMaterializerSettings(system)
|
val settings = ActorMaterializerSettings(system)
|
||||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||||
|
|
|
||||||
|
|
@ -283,8 +283,8 @@ class FlowNames extends Extension {
|
||||||
object StreamSupervisor {
|
object StreamSupervisor {
|
||||||
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
|
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
|
||||||
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
|
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
|
||||||
|
private[stream] val baseName = "StreamSupervisor"
|
||||||
private val actorName = SeqActorName("StreamSupervisor")
|
private val actorName = SeqActorName(baseName)
|
||||||
def nextName(): String = actorName.next()
|
def nextName(): String = actorName.next()
|
||||||
|
|
||||||
final case class Materialize(props: Props, name: String)
|
final case class Materialize(props: Props, name: String)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue