Merge pull request #1211 from akka/wip-3110-test-thread-dump-rich

Dump diagnostics (Coroner's Report) if a test takes too long. Fixes #3110
This commit is contained in:
Rich Dougherty 2013-03-07 23:21:35 -08:00
commit a78e26ae41
6 changed files with 305 additions and 2 deletions

View file

@ -324,6 +324,8 @@ class DefaultSchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with Schedul
}
}
}
override def expectedTestDuration = 5 minutes
}
class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRevolver) with SchedulerSpec {

View file

@ -50,16 +50,23 @@ object MultiNodeClusterSpec {
""")
}
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeSpec
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec
override def initialParticipants = roles.size
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
override def atStartup(): Unit = {
startCoroner()
muteLog()
}
override def afterTermination(): Unit = {
stopCoroner()
}
override def expectedTestDuration = 60.seconds
def muteLog(sys: ActorSystem = system): Unit = {
if (!sys.log.isDebugEnabled) {
Seq(".*Metrics collection has started successfully.*",

View file

@ -85,6 +85,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
high-throughput-duration = 10s
supervision-duration = 10s
supervision-one-iteration = 1s
expected-test-duration = 600s
# actors are created in a tree structure defined
# by tree-width (number of children for each actor) and
# tree-levels, total number of actors can be calculated by
@ -169,6 +170,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
val highThroughputDuration = getDuration("high-throughput-duration") * dFactor
val supervisionDuration = getDuration("supervision-duration") * dFactor
val supervisionOneIteration = getDuration("supervision-one-iteration") * dFactor
val expectedTestDuration = getDuration("expected-test-duration") * dFactor
val treeWidth = getInt("tree-width")
val treeLevels = getInt("tree-levels")
val reportMetricsInterval = getDuration("report-metrics-interval")
@ -650,6 +652,8 @@ abstract class StressSpec
override def beforeEach(): Unit = { step += 1 }
override def expectedTestDuration = settings.expectedTestDuration
override def muteLog(sys: ActorSystem = system): Unit = {
super.muteLog(sys)
sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*")))

View file

@ -52,7 +52,7 @@ object AkkaSpec {
}
abstract class AkkaSpec(_system: ActorSystem)
extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll {
extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll with WatchedByCoroner {
def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass),
ConfigFactory.load(config.withFallback(AkkaSpec.testConf))))
@ -66,6 +66,7 @@ abstract class AkkaSpec(_system: ActorSystem)
val log: LoggingAdapter = Logging(system, this.getClass)
final override def beforeAll {
startCoroner
atStartup()
}
@ -78,6 +79,7 @@ abstract class AkkaSpec(_system: ActorSystem)
println(system.asInstanceOf[ActorSystemImpl].printTree)
}
afterTermination()
stopCoroner()
}
protected def atStartup() {}
@ -88,4 +90,7 @@ abstract class AkkaSpec(_system: ActorSystem)
def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: Unit): Unit =
Future(body)(system.dispatchers.lookup(dispatcherId))
override def expectedTestDuration: FiniteDuration = 60 seconds
}

View file

@ -0,0 +1,160 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit
import java.io.PrintStream
import java.lang.management.{ ManagementFactory, ThreadInfo }
import java.util.Date
import java.util.concurrent.CountDownLatch
import org.scalatest.{ BeforeAndAfterAll, Suite }
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.control.NonFatal
/**
* The Coroner can be used to print a diagnostic report of the JVM state,
* including stack traces and deadlocks. A report can be printed directly, by
* calling `printReport`. Alternatively, the Coroner can be asked to `watch`
* the JVM and generate a report at a later time - unless the Coroner is cancelled
* by that time.
*
* The latter method is useful for printing diagnostics in the event that, for
* example, a unit test stalls and fails to cancel the Coroner in time. The
* Coroner will assume that the test has "died" and print a report to aid in
* debugging.
*/
object Coroner {
/**
* Used to cancel the Coroner after calling `watch`.
*/
trait WatchHandle {
def cancel(): Unit
}
/**
* Ask the Coroner to print a report if it is not cancelled by the given deadline.
* The returned handle can be used to perform the cancellation.
*/
def watch(deadline: Deadline, reportTitle: String, out: PrintStream): WatchHandle = {
val duration = deadline.timeLeft // Store for later reporting
val cancelLatch = new CountDownLatch(1)
@tailrec def watchLoop() {
if (deadline.isOverdue) {
triggerReport()
} else {
val cancelled = try {
cancelLatch.await(deadline.timeLeft.length, deadline.timeLeft.unit)
} catch {
case _: InterruptedException false
}
if (cancelled) {
// Our job is finished, let the thread stop
} else {
watchLoop()
}
}
}
def triggerReport() {
out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...")
try {
printReport(reportTitle, out)
} catch {
case NonFatal(ex) {
out.println("Error displaying Coroner's Report")
ex.printStackTrace(out)
}
}
}
val thread = new Thread(new Runnable { def run = watchLoop() }, "Coroner")
thread.start() // Must store thread in val to work around SI-7203
new WatchHandle {
def cancel() { cancelLatch.countDown() }
}
}
/**
* Print a report containing diagnostic information.
*/
def printReport(reportTitle: String, out: PrintStream) {
import out.println
val osMx = ManagementFactory.getOperatingSystemMXBean()
val rtMx = ManagementFactory.getRuntimeMXBean()
val memMx = ManagementFactory.getMemoryMXBean()
val threadMx = ManagementFactory.getThreadMXBean()
println(s"""#Coroner's Report: $reportTitle
#OS Architecture: ${osMx.getArch()}
#Available processors: ${osMx.getAvailableProcessors()}
#System load (last minute): ${osMx.getSystemLoadAverage()}
#VM start time: ${new Date(rtMx.getStartTime())}
#VM uptime: ${rtMx.getUptime()}ms
#Heap usage: ${memMx.getHeapMemoryUsage()}
#Non-heap usage: ${memMx.getNonHeapMemoryUsage()}""".stripMargin('#'))
def dumpAllThreads(): Seq[ThreadInfo] = {
threadMx.dumpAllThreads(
threadMx.isObjectMonitorUsageSupported(),
threadMx.isSynchronizerUsageSupported())
}
def findDeadlockedThreads(): (Seq[ThreadInfo], String) = {
val (ids, desc) = if (threadMx.isSynchronizerUsageSupported()) {
(threadMx.findDeadlockedThreads(), "monitors and ownable synchronizers")
} else {
(threadMx.findMonitorDeadlockedThreads(), "monitors, but NOT ownable synchronizers")
}
if (ids == null) {
(Seq.empty, desc)
} else {
val maxTraceDepth = 1000 // Seems deep enough
(threadMx.getThreadInfo(ids, maxTraceDepth), desc)
}
}
def printThreadInfo(threadInfos: Seq[ThreadInfo]) = {
if (threadInfos.isEmpty) {
println("None")
} else {
for (ti threadInfos.sortBy(_.getThreadName)) { println(ti) }
}
}
println("All threads:")
printThreadInfo(dumpAllThreads())
val (deadlockedThreads, deadlockDesc) = findDeadlockedThreads()
println(s"Deadlocks found for $deadlockDesc:")
printThreadInfo(deadlockedThreads)
}
}
/**
* Mixin for tests that should be watched by the Coroner. The `startCoroner`
* and `stopCoroner` methods should be called before and after the test runs.
* The Coroner will display its report if the test takes longer than the
* (dilated) `expectedTestDuration` to run.
*/
trait WatchedByCoroner {
self: TestKit
@volatile private var coronerWatch: Coroner.WatchHandle = _
final def startCoroner() {
coronerWatch = Coroner.watch(expectedTestDuration.dilated.fromNow, getClass.getName, System.err)
}
final def stopCoroner() {
coronerWatch.cancel()
coronerWatch = null
}
def expectedTestDuration: FiniteDuration
}

View file

@ -0,0 +1,125 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit
import java.io._
import java.lang.management.ManagementFactory
import java.util.concurrent.Semaphore
import java.util.concurrent.locks.ReentrantLock
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import scala.concurrent.duration._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CoronerSpec extends WordSpec with MustMatchers {
private def captureOutput[A](f: PrintStream A): (A, String) = {
val bytes = new ByteArrayOutputStream()
val out = new PrintStream(bytes)
val result = f(out)
(result, new String(bytes.toByteArray()))
}
"A Coroner" must {
"generate a report if enough time passes" in {
val (_, report) = captureOutput(out {
val handle = Coroner.watch(500.milliseconds.fromNow, "XXXX", out)
Thread.sleep(1000.milliseconds.toMillis)
})
report must include("Coroner's Report")
report must include("XXXX")
}
"not generate a report if cancelled early" in {
val (_, report) = captureOutput(out {
val coroner = Coroner.watch(500.milliseconds.fromNow, "XXXX", out)
coroner.cancel()
Thread.sleep(1000.milliseconds.toMillis)
})
report must be("")
}
"display deadlock information in its report" in {
// Create two threads that each recursively synchronize on a list of
// objects. Give each thread the same objects, but in reversed order.
// Control execution of the threads so that they each hold an object
// that the other wants to synchronize on. BOOM! Deadlock. Generate a
// report, then clean up and check the report contents.
case class LockingThread(name: String, thread: Thread, ready: Semaphore, proceed: Semaphore)
def lockingThread(name: String, initialLocks: List[ReentrantLock]): LockingThread = {
val ready = new Semaphore(0)
val proceed = new Semaphore(0)
val t = new Thread(new Runnable {
def run = try recursiveLock(initialLocks) catch { case _: InterruptedException () }
def recursiveLock(locks: List[ReentrantLock]) {
locks match {
case Nil ()
case lock :: rest {
ready.release()
proceed.acquire()
lock.lockInterruptibly() // Allows us to break deadlock and free threads
try {
recursiveLock(rest)
} finally {
lock.unlock()
}
}
}
}
}, name)
t.start()
LockingThread(name, t, ready, proceed)
}
val x = new ReentrantLock()
val y = new ReentrantLock()
val a = lockingThread("deadlock-thread-a", List(x, y))
val b = lockingThread("deadlock-thread-b", List(y, x))
// Walk threads into deadlock
a.ready.acquire()
b.ready.acquire()
a.proceed.release()
b.proceed.release()
a.ready.acquire()
b.ready.acquire()
a.proceed.release()
b.proceed.release()
val (_, report) = captureOutput(Coroner.printReport("Deadlock test", _))
a.thread.interrupt()
b.thread.interrupt()
report must include("Coroner's Report")
// Split test based on JVM capabilities. Not all JVMs can detect
// deadlock between ReentrantLocks. However, we need to use
// ReentrantLocks because normal, monitor-based locks cannot be
// un-deadlocked once this test is finished.
val threadMx = ManagementFactory.getThreadMXBean()
if (threadMx.isSynchronizerUsageSupported()) {
val sectionHeading = "Deadlocks found for monitors and ownable synchronizers"
report must include(sectionHeading)
val deadlockSection = report.split(sectionHeading)(1)
deadlockSection must include("deadlock-thread-a")
deadlockSection must include("deadlock-thread-b")
} else {
val sectionHeading = "Deadlocks found for monitors, but NOT ownable synchronizers"
report must include(sectionHeading)
val deadlockSection = report.split(sectionHeading)(1)
deadlockSection must include("None")
deadlockSection must not include ("deadlock-thread-a")
deadlockSection must not include ("deadlock-thread-b")
}
}
}
}