diff --git a/akka-testkit/src/test/scala/akka/testkit/Coroner.scala b/akka-testkit/src/test/scala/akka/testkit/Coroner.scala index 39de089e8b..dd4b0b1f85 100644 --- a/akka-testkit/src/test/scala/akka/testkit/Coroner.scala +++ b/akka-testkit/src/test/scala/akka/testkit/Coroner.scala @@ -6,10 +6,10 @@ package akka.testkit import java.io.PrintStream import java.lang.management.{ ManagementFactory, ThreadInfo } import java.util.Date -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{ TimeoutException, CountDownLatch } import org.scalatest.{ BeforeAndAfterAll, Suite } import scala.annotation.tailrec -import scala.concurrent.{ Awaitable, CanAwait, Await } +import scala.concurrent.{ Promise, Awaitable, CanAwait, Await } import scala.concurrent.duration._ import scala.util.control.NonFatal @@ -32,37 +32,89 @@ object Coroner { * The result of this Awaitable will be `true` if it has been cancelled. */ trait WatchHandle extends Awaitable[Boolean] { + /** + * Will try to ensure that the Coroner has finished reporting. + */ def cancel(): Unit } + private class WatchHandleImpl(startAndStopDuration: FiniteDuration) + extends WatchHandle { + val cancelPromise = Promise[Boolean] + val startedLatch = new CountDownLatch(1) + val finishedLatch = new CountDownLatch(1) + + def waitForStart(): Unit = { + startedLatch.await(startAndStopDuration.length, startAndStopDuration.unit) + } + + def started(): Unit = startedLatch.countDown() + + def finished(): Unit = finishedLatch.countDown() + + def expired(): Unit = cancelPromise.trySuccess(false) + + override def cancel(): Unit = { + cancelPromise.trySuccess(true) + finishedLatch.await(startAndStopDuration.length, startAndStopDuration.unit) + } + + override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { + result(atMost) + this + } + + override def result(atMost: Duration)(implicit permit: CanAwait): Boolean = + try { Await.result(cancelPromise.future, atMost) } catch { case _: TimeoutException ⇒ false } + + } + + val defaultStartAndStopDuration = 1.second + /** * Ask the Coroner to print a report if it is not cancelled by the given deadline. * The returned handle can be used to perform the cancellation. + * + * If displayThreadCounts is set to true, then the Coroner will print thread counts during start + * and stop. */ - def watch(deadline: Deadline, reportTitle: String, out: PrintStream): WatchHandle = { - val cancelLatch = new CountDownLatch(1) with WatchHandle { - override def cancel(): Unit = countDown() - override def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { - result(atMost) - this - } - override def result(atMost: Duration)(implicit permit: CanAwait): Boolean = await(atMost.length, atMost.unit) - } + def watch(duration: FiniteDuration, reportTitle: String, out: PrintStream, + startAndStopDuration: FiniteDuration = defaultStartAndStopDuration, + displayThreadCounts: Boolean = false): WatchHandle = { - def triggerReportIfOverdue(duration: Duration): Unit = - if (!Await.result(cancelLatch, duration)) { - out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...") - try printReport(reportTitle, out) catch { - case NonFatal(ex) ⇒ { - out.println("Error displaying Coroner's Report") - ex.printStackTrace(out) - } - } finally out.flush() + val watchedHandle = new WatchHandleImpl(startAndStopDuration) + + def triggerReportIfOverdue(duration: Duration): Unit = { + val threadMx = ManagementFactory.getThreadMXBean() + val startThreads = threadMx.getThreadCount + if (displayThreadCounts) { + threadMx.resetPeakThreadCount() + out.println(s"Coroner Thread Count Start: current = ${startThreads} in ${reportTitle}") } - val duration = deadline.timeLeft // Store for later reporting - val thread = new Thread(new Runnable { def run = triggerReportIfOverdue(duration) }, "Coroner") - thread.start() // Must store thread in val to work around SI-7203 - cancelLatch + watchedHandle.started() + try { + if (!Await.result(watchedHandle, duration)) { + watchedHandle.expired() + out.println(s"Coroner not cancelled after ${duration.toMillis}ms. Looking for signs of foul play...") + try printReport(reportTitle, out) catch { + case NonFatal(ex) ⇒ { + out.println("Error displaying Coroner's Report") + ex.printStackTrace(out) + } + } + } + } finally { + if (displayThreadCounts) { + val endThreads = threadMx.getThreadCount + out.println(s"Coroner Thread Count End: current = ${endThreads}, diff = ${endThreads - startThreads}, peak = ${threadMx.getPeakThreadCount} in ${reportTitle}") + } + out.flush() + watchedHandle.finished() + } + } + new Thread(new Runnable { def run = triggerReportIfOverdue(duration) }, "Coroner").start() + watchedHandle.waitForStart() + watchedHandle } /** @@ -191,6 +243,9 @@ object Coroner { * and `stopCoroner` methods should be called before and after the test runs. * The Coroner will display its report if the test takes longer than the * (dilated) `expectedTestDuration` to run. + * + * If displayThreadCounts is set to true, then the Coroner will print thread + * counts during start and stop. */ trait WatchedByCoroner { self: TestKit ⇒ @@ -198,7 +253,8 @@ trait WatchedByCoroner { @volatile private var coronerWatch: Coroner.WatchHandle = _ final def startCoroner() { - coronerWatch = Coroner.watch(expectedTestDuration.dilated.fromNow, getClass.getName, System.err) + coronerWatch = Coroner.watch(expectedTestDuration.dilated, getClass.getName, System.err, + startAndStopDuration.dilated, displayThreadCounts) } final def stopCoroner() { @@ -207,4 +263,8 @@ trait WatchedByCoroner { } def expectedTestDuration: FiniteDuration + + def displayThreadCounts: Boolean = true + + def startAndStopDuration: FiniteDuration = Coroner.defaultStartAndStopDuration } \ No newline at end of file diff --git a/akka-testkit/src/test/scala/akka/testkit/CoronerSpec.scala b/akka-testkit/src/test/scala/akka/testkit/CoronerSpec.scala index 25ae910158..d222a17f75 100644 --- a/akka-testkit/src/test/scala/akka/testkit/CoronerSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/CoronerSpec.scala @@ -24,19 +24,35 @@ class CoronerSpec extends WordSpec with MustMatchers { "A Coroner" must { - "generate a report if enough time passes" taggedAs TimingTest in { - val (_, report) = captureOutput(out ⇒ Await.ready(Coroner.watch(100.milliseconds.fromNow, "XXXX", out), 1.second)) + "generate a report if enough time passes" in { + val (_, report) = captureOutput(out ⇒ { + val coroner = Coroner.watch(100.milliseconds, "XXXX", out) + Await.ready(coroner, 5.seconds) + coroner.cancel() + }) report must include("Coroner's Report") report must include("XXXX") } - "not generate a report if cancelled early" taggedAs TimingTest in { + "not generate a report if cancelled early" in { val (_, report) = captureOutput(out ⇒ { - val coroner = Coroner.watch(100.milliseconds.fromNow, "XXXX", out) + val coroner = Coroner.watch(60.seconds, "XXXX", out) + coroner.cancel() + Await.ready(coroner, 1.seconds) + }) + report must be("") + } + + "display thread counts if enabled" in { + val (_, report) = captureOutput(out ⇒ { + val coroner = Coroner.watch(60.seconds, "XXXX", out, displayThreadCounts = true) coroner.cancel() Await.ready(coroner, 1.second) }) - report must be("") + report must include("Coroner Thread Count Start:") + report must include("Coroner Thread Count End:") + report must include("XXXX") + report must not include ("Coroner's Report") } "display deadlock information in its report" in {