Count the number of threads during tests to find leaks. See #2936

This commit is contained in:
Björn Antonsson 2013-04-25 11:29:45 +02:00
parent 72bfc4e84f
commit d34996ae11
2 changed files with 106 additions and 30 deletions

View file

@ -6,10 +6,10 @@ package akka.testkit
import java.io.PrintStream
import java.lang.management.{ ManagementFactory, ThreadInfo }
import java.util.Date
import java.util.concurrent.CountDownLatch
import java.util.concurrent.{ TimeoutException, CountDownLatch }
import org.scalatest.{ BeforeAndAfterAll, Suite }
import scala.annotation.tailrec
import scala.concurrent.{ Awaitable, CanAwait, Await }
import scala.concurrent.{ Promise, Awaitable, CanAwait, Await }
import scala.concurrent.duration._
import scala.util.control.NonFatal
@ -32,37 +32,89 @@ object Coroner {
* The result of this Awaitable will be `true` if it has been cancelled.
*/
trait WatchHandle extends Awaitable[Boolean] {
/**
* Will try to ensure that the Coroner has finished reporting.
*/
def cancel(): Unit
}
private class WatchHandleImpl(startAndStopDuration: FiniteDuration)
extends WatchHandle {
val cancelPromise = Promise[Boolean]
val startedLatch = new CountDownLatch(1)
val finishedLatch = new CountDownLatch(1)
def waitForStart(): Unit = {
startedLatch.await(startAndStopDuration.length, startAndStopDuration.unit)
}
def started(): Unit = startedLatch.countDown()
def finished(): Unit = finishedLatch.countDown()
def expired(): Unit = cancelPromise.trySuccess(false)
override def cancel(): Unit = {
cancelPromise.trySuccess(true)
finishedLatch.await(startAndStopDuration.length, startAndStopDuration.unit)
}
override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
result(atMost)
this
}
override def result(atMost: Duration)(implicit permit: CanAwait): Boolean =
try { Await.result(cancelPromise.future, atMost) } catch { case _: TimeoutException false }
}
val defaultStartAndStopDuration = 1.second
/**
* Ask the Coroner to print a report if it is not cancelled by the given deadline.
* The returned handle can be used to perform the cancellation.
*
* If displayThreadCounts is set to true, then the Coroner will print thread counts during start
* and stop.
*/
def watch(deadline: Deadline, reportTitle: String, out: PrintStream): WatchHandle = {
val cancelLatch = new CountDownLatch(1) with WatchHandle {
override def cancel(): Unit = countDown()
override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
result(atMost)
this
}
override def result(atMost: Duration)(implicit permit: CanAwait): Boolean = await(atMost.length, atMost.unit)
}
def watch(duration: FiniteDuration, reportTitle: String, out: PrintStream,
startAndStopDuration: FiniteDuration = defaultStartAndStopDuration,
displayThreadCounts: Boolean = false): WatchHandle = {
def triggerReportIfOverdue(duration: Duration): Unit =
if (!Await.result(cancelLatch, duration)) {
out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...")
try printReport(reportTitle, out) catch {
case NonFatal(ex) {
out.println("Error displaying Coroner's Report")
ex.printStackTrace(out)
}
} finally out.flush()
val watchedHandle = new WatchHandleImpl(startAndStopDuration)
def triggerReportIfOverdue(duration: Duration): Unit = {
val threadMx = ManagementFactory.getThreadMXBean()
val startThreads = threadMx.getThreadCount
if (displayThreadCounts) {
threadMx.resetPeakThreadCount()
out.println(s"Coroner Thread Count Start: current = ${startThreads} in ${reportTitle}")
}
val duration = deadline.timeLeft // Store for later reporting
val thread = new Thread(new Runnable { def run = triggerReportIfOverdue(duration) }, "Coroner")
thread.start() // Must store thread in val to work around SI-7203
cancelLatch
watchedHandle.started()
try {
if (!Await.result(watchedHandle, duration)) {
watchedHandle.expired()
out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...")
try printReport(reportTitle, out) catch {
case NonFatal(ex) {
out.println("Error displaying Coroner's Report")
ex.printStackTrace(out)
}
}
}
} finally {
if (displayThreadCounts) {
val endThreads = threadMx.getThreadCount
out.println(s"Coroner Thread Count End: current = ${endThreads}, diff = ${endThreads - startThreads}, peak = ${threadMx.getPeakThreadCount} in ${reportTitle}")
}
out.flush()
watchedHandle.finished()
}
}
new Thread(new Runnable { def run = triggerReportIfOverdue(duration) }, "Coroner").start()
watchedHandle.waitForStart()
watchedHandle
}
/**
@ -191,6 +243,9 @@ object Coroner {
* and `stopCoroner` methods should be called before and after the test runs.
* The Coroner will display its report if the test takes longer than the
* (dilated) `expectedTestDuration` to run.
*
* If displayThreadCounts is set to true, then the Coroner will print thread
* counts during start and stop.
*/
trait WatchedByCoroner {
self: TestKit
@ -198,7 +253,8 @@ trait WatchedByCoroner {
@volatile private var coronerWatch: Coroner.WatchHandle = _
final def startCoroner() {
coronerWatch = Coroner.watch(expectedTestDuration.dilated.fromNow, getClass.getName, System.err)
coronerWatch = Coroner.watch(expectedTestDuration.dilated, getClass.getName, System.err,
startAndStopDuration.dilated, displayThreadCounts)
}
final def stopCoroner() {
@ -207,4 +263,8 @@ trait WatchedByCoroner {
}
def expectedTestDuration: FiniteDuration
def displayThreadCounts: Boolean = true
def startAndStopDuration: FiniteDuration = Coroner.defaultStartAndStopDuration
}

View file

@ -24,19 +24,35 @@ class CoronerSpec extends WordSpec with MustMatchers {
"A Coroner" must {
"generate a report if enough time passes" taggedAs TimingTest in {
val (_, report) = captureOutput(out Await.ready(Coroner.watch(100.milliseconds.fromNow, "XXXX", out), 1.second))
"generate a report if enough time passes" in {
val (_, report) = captureOutput(out {
val coroner = Coroner.watch(100.milliseconds, "XXXX", out)
Await.ready(coroner, 5.seconds)
coroner.cancel()
})
report must include("Coroner's Report")
report must include("XXXX")
}
"not generate a report if cancelled early" taggedAs TimingTest in {
"not generate a report if cancelled early" in {
val (_, report) = captureOutput(out {
val coroner = Coroner.watch(100.milliseconds.fromNow, "XXXX", out)
val coroner = Coroner.watch(60.seconds, "XXXX", out)
coroner.cancel()
Await.ready(coroner, 1.seconds)
})
report must be("")
}
"display thread counts if enabled" in {
val (_, report) = captureOutput(out {
val coroner = Coroner.watch(60.seconds, "XXXX", out, displayThreadCounts = true)
coroner.cancel()
Await.ready(coroner, 1.second)
})
report must be("")
report must include("Coroner Thread Count Start:")
report must include("Coroner Thread Count End:")
report must include("XXXX")
report must not include ("Coroner's Report")
}
"display deadlock information in its report" in {