diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 0cba96ffcc..c815518b80 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -324,6 +324,8 @@ class DefaultSchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with Schedul } } } + + override def expectedTestDuration = 5 minutes } class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRevolver) with SchedulerSpec { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 144e7b3fa9..0f3fae3c7c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -50,16 +50,23 @@ object MultiNodeClusterSpec { """) } -trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeSpec ⇒ +trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec ⇒ override def initialParticipants = roles.size private val cachedAddresses = new ConcurrentHashMap[RoleName, Address] override def atStartup(): Unit = { + startCoroner() muteLog() } + override def afterTermination(): Unit = { + stopCoroner() + } + + override def expectedTestDuration = 60.seconds + def muteLog(sys: ActorSystem = system): Unit = { if (!sys.log.isDebugEnabled) { Seq(".*Metrics collection has started successfully.*", diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index a09a98c8f8..c0902eeffd 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -85,6 +85,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { high-throughput-duration = 10s supervision-duration = 10s supervision-one-iteration = 1s + expected-test-duration = 600s # actors are created in a tree structure defined # by tree-width (number of children for each actor) and # tree-levels, total number of actors can be calculated by @@ -169,6 +170,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { val highThroughputDuration = getDuration("high-throughput-duration") * dFactor val supervisionDuration = getDuration("supervision-duration") * dFactor val supervisionOneIteration = getDuration("supervision-one-iteration") * dFactor + val expectedTestDuration = getDuration("expected-test-duration") * dFactor val treeWidth = getInt("tree-width") val treeLevels = getInt("tree-levels") val reportMetricsInterval = getDuration("report-metrics-interval") @@ -650,6 +652,8 @@ abstract class StressSpec override def beforeEach(): Unit = { step += 1 } + override def expectedTestDuration = settings.expectedTestDuration + override def muteLog(sys: ActorSystem = system): Unit = { super.muteLog(sys) sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 4a2e24dbd2..4cc59eca6e 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -52,7 +52,7 @@ object AkkaSpec { } abstract class AkkaSpec(_system: ActorSystem) - extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll { + extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll with WatchedByCoroner { def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass), ConfigFactory.load(config.withFallback(AkkaSpec.testConf)))) @@ -66,6 +66,7 @@ abstract class AkkaSpec(_system: ActorSystem) val log: LoggingAdapter = Logging(system, this.getClass) final override def beforeAll { + startCoroner atStartup() } @@ -78,6 +79,7 @@ abstract class AkkaSpec(_system: ActorSystem) println(system.asInstanceOf[ActorSystemImpl].printTree) } afterTermination() + stopCoroner() } protected def atStartup() {} @@ -88,4 +90,7 @@ abstract class AkkaSpec(_system: ActorSystem) def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit): Unit = Future(body)(system.dispatchers.lookup(dispatcherId)) + + override def expectedTestDuration: FiniteDuration = 60 seconds + } diff --git a/akka-testkit/src/test/scala/akka/testkit/Coroner.scala b/akka-testkit/src/test/scala/akka/testkit/Coroner.scala new file mode 100644 index 0000000000..601c23abd4 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/Coroner.scala @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.testkit + +import java.io.PrintStream +import java.lang.management.{ ManagementFactory, ThreadInfo } +import java.util.Date +import java.util.concurrent.CountDownLatch +import org.scalatest.{ BeforeAndAfterAll, Suite } +import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +/** + * The Coroner can be used to print a diagnostic report of the JVM state, + * including stack traces and deadlocks. A report can be printed directly, by + * calling `printReport`. Alternatively, the Coroner can be asked to `watch` + * the JVM and generate a report at a later time - unless the Coroner is cancelled + * by that time. + * + * The latter method is useful for printing diagnostics in the event that, for + * example, a unit test stalls and fails to cancel the Coroner in time. The + * Coroner will assume that the test has "died" and print a report to aid in + * debugging. + */ +object Coroner { + + /** + * Used to cancel the Coroner after calling `watch`. + */ + trait WatchHandle { + def cancel(): Unit + } + + /** + * Ask the Coroner to print a report if it is not cancelled by the given deadline. + * The returned handle can be used to perform the cancellation. + */ + def watch(deadline: Deadline, reportTitle: String, out: PrintStream): WatchHandle = { + val duration = deadline.timeLeft // Store for later reporting + val cancelLatch = new CountDownLatch(1) + + @tailrec def watchLoop() { + if (deadline.isOverdue) { + triggerReport() + } else { + val cancelled = try { + cancelLatch.await(deadline.timeLeft.length, deadline.timeLeft.unit) + } catch { + case _: InterruptedException ⇒ false + } + if (cancelled) { + // Our job is finished, let the thread stop + } else { + watchLoop() + } + } + } + + def triggerReport() { + out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...") + try { + printReport(reportTitle, out) + } catch { + case NonFatal(ex) ⇒ { + out.println("Error displaying Coroner's Report") + ex.printStackTrace(out) + } + } + } + + val thread = new Thread(new Runnable { def run = watchLoop() }, "Coroner") + thread.start() // Must store thread in val to work around SI-7203 + + new WatchHandle { + def cancel() { cancelLatch.countDown() } + } + } + + /** + * Print a report containing diagnostic information. + */ + def printReport(reportTitle: String, out: PrintStream) { + import out.println + + val osMx = ManagementFactory.getOperatingSystemMXBean() + val rtMx = ManagementFactory.getRuntimeMXBean() + val memMx = ManagementFactory.getMemoryMXBean() + val threadMx = ManagementFactory.getThreadMXBean() + + println(s"""#Coroner's Report: $reportTitle + #OS Architecture: ${osMx.getArch()} + #Available processors: ${osMx.getAvailableProcessors()} + #System load (last minute): ${osMx.getSystemLoadAverage()} + #VM start time: ${new Date(rtMx.getStartTime())} + #VM uptime: ${rtMx.getUptime()}ms + #Heap usage: ${memMx.getHeapMemoryUsage()} + #Non-heap usage: ${memMx.getNonHeapMemoryUsage()}""".stripMargin('#')) + + def dumpAllThreads(): Seq[ThreadInfo] = { + threadMx.dumpAllThreads( + threadMx.isObjectMonitorUsageSupported(), + threadMx.isSynchronizerUsageSupported()) + } + + def findDeadlockedThreads(): (Seq[ThreadInfo], String) = { + val (ids, desc) = if (threadMx.isSynchronizerUsageSupported()) { + (threadMx.findDeadlockedThreads(), "monitors and ownable synchronizers") + } else { + (threadMx.findMonitorDeadlockedThreads(), "monitors, but NOT ownable synchronizers") + } + if (ids == null) { + (Seq.empty, desc) + } else { + val maxTraceDepth = 1000 // Seems deep enough + (threadMx.getThreadInfo(ids, maxTraceDepth), desc) + } + } + + def printThreadInfo(threadInfos: Seq[ThreadInfo]) = { + if (threadInfos.isEmpty) { + println("None") + } else { + for (ti ← threadInfos.sortBy(_.getThreadName)) { println(ti) } + } + } + + println("All threads:") + printThreadInfo(dumpAllThreads()) + + val (deadlockedThreads, deadlockDesc) = findDeadlockedThreads() + println(s"Deadlocks found for $deadlockDesc:") + printThreadInfo(deadlockedThreads) + } + +} + +/** + * Mixin for tests that should be watched by the Coroner. The `startCoroner` + * and `stopCoroner` methods should be called before and after the test runs. + * The Coroner will display its report if the test takes longer than the + * (dilated) `expectedTestDuration` to run. + */ +trait WatchedByCoroner { + self: TestKit ⇒ + + @volatile private var coronerWatch: Coroner.WatchHandle = _ + + final def startCoroner() { + coronerWatch = Coroner.watch(expectedTestDuration.dilated.fromNow, getClass.getName, System.err) + } + + final def stopCoroner() { + coronerWatch.cancel() + coronerWatch = null + } + + def expectedTestDuration: FiniteDuration +} \ No newline at end of file diff --git a/akka-testkit/src/test/scala/akka/testkit/CoronerSpec.scala b/akka-testkit/src/test/scala/akka/testkit/CoronerSpec.scala new file mode 100644 index 0000000000..fb0ffa9222 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/CoronerSpec.scala @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.testkit + +import java.io._ +import java.lang.management.ManagementFactory +import java.util.concurrent.Semaphore +import java.util.concurrent.locks.ReentrantLock +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import scala.concurrent.duration._ + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class CoronerSpec extends WordSpec with MustMatchers { + + private def captureOutput[A](f: PrintStream ⇒ A): (A, String) = { + val bytes = new ByteArrayOutputStream() + val out = new PrintStream(bytes) + val result = f(out) + (result, new String(bytes.toByteArray())) + } + + "A Coroner" must { + + "generate a report if enough time passes" in { + val (_, report) = captureOutput(out ⇒ { + val handle = Coroner.watch(500.milliseconds.fromNow, "XXXX", out) + Thread.sleep(1000.milliseconds.toMillis) + }) + report must include("Coroner's Report") + report must include("XXXX") + } + + "not generate a report if cancelled early" in { + val (_, report) = captureOutput(out ⇒ { + val coroner = Coroner.watch(500.milliseconds.fromNow, "XXXX", out) + coroner.cancel() + Thread.sleep(1000.milliseconds.toMillis) + }) + report must be("") + } + + "display deadlock information in its report" in { + + // Create two threads that each recursively synchronize on a list of + // objects. Give each thread the same objects, but in reversed order. + // Control execution of the threads so that they each hold an object + // that the other wants to synchronize on. BOOM! Deadlock. Generate a + // report, then clean up and check the report contents. + + case class LockingThread(name: String, thread: Thread, ready: Semaphore, proceed: Semaphore) + + def lockingThread(name: String, initialLocks: List[ReentrantLock]): LockingThread = { + val ready = new Semaphore(0) + val proceed = new Semaphore(0) + val t = new Thread(new Runnable { + def run = try recursiveLock(initialLocks) catch { case _: InterruptedException ⇒ () } + + def recursiveLock(locks: List[ReentrantLock]) { + locks match { + case Nil ⇒ () + case lock :: rest ⇒ { + ready.release() + proceed.acquire() + lock.lockInterruptibly() // Allows us to break deadlock and free threads + try { + recursiveLock(rest) + } finally { + lock.unlock() + } + } + } + } + }, name) + t.start() + LockingThread(name, t, ready, proceed) + } + + val x = new ReentrantLock() + val y = new ReentrantLock() + val a = lockingThread("deadlock-thread-a", List(x, y)) + val b = lockingThread("deadlock-thread-b", List(y, x)) + + // Walk threads into deadlock + a.ready.acquire() + b.ready.acquire() + a.proceed.release() + b.proceed.release() + a.ready.acquire() + b.ready.acquire() + a.proceed.release() + b.proceed.release() + + val (_, report) = captureOutput(Coroner.printReport("Deadlock test", _)) + + a.thread.interrupt() + b.thread.interrupt() + + report must include("Coroner's Report") + + // Split test based on JVM capabilities. Not all JVMs can detect + // deadlock between ReentrantLocks. However, we need to use + // ReentrantLocks because normal, monitor-based locks cannot be + // un-deadlocked once this test is finished. + + val threadMx = ManagementFactory.getThreadMXBean() + if (threadMx.isSynchronizerUsageSupported()) { + val sectionHeading = "Deadlocks found for monitors and ownable synchronizers" + report must include(sectionHeading) + val deadlockSection = report.split(sectionHeading)(1) + deadlockSection must include("deadlock-thread-a") + deadlockSection must include("deadlock-thread-b") + } else { + val sectionHeading = "Deadlocks found for monitors, but NOT ownable synchronizers" + report must include(sectionHeading) + val deadlockSection = report.split(sectionHeading)(1) + deadlockSection must include("None") + deadlockSection must not include ("deadlock-thread-a") + deadlockSection must not include ("deadlock-thread-b") + } + } + + } +}