diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index e2df8ba19d..cf6ed866c8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -14,6 +14,8 @@ import org.scalatest.matchers.MustMatchers class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { import Ticket669Spec._ + override def beforeAll = Thread.interrupted() //remove interrupted status. + override def afterAll = { Actor.registry.local.shutdownAll akka.event.EventHandler.start @@ -21,6 +23,7 @@ class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll { "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { + val latch = new CountDownLatch(1) val sender = Actor.actorOf(new Sender(latch)).start() diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 156726ca0b..62297ca495 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -4,33 +4,49 @@ package akka.actor.dispatch import org.scalatest.junit.JUnitSuite -import org.junit.Test import org.scalatest.Assertions._ import akka.testkit.Testing import akka.dispatch._ import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit } +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor -import akka.util.{ Duration, Switch } -import org.multiverse.api.latches.StandardLatch -import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor } +import akka.util.Switch +import akka.actor.{ActorKilledException, PoisonPill, ActorRef, Actor} +import java.rmi.RemoteException +import org.junit.{After, Test} object ActorModelSpec { sealed trait ActorModelMessage + case class Reply_?(expect: Any) extends ActorModelMessage + case class Reply(expect: Any) extends ActorModelMessage + case class Forward(to: ActorRef, msg: Any) extends ActorModelMessage + case class CountDown(latch: CountDownLatch) extends ActorModelMessage + case class Increment(counter: AtomicLong) extends ActorModelMessage + case class Await(latch: CountDownLatch) extends ActorModelMessage + case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage + case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage + case class Wait(time: Long) extends ActorModelMessage + case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage + + case object Interrupt extends ActorModelMessage + case object Restart extends ActorModelMessage + case class ThrowException(e: Throwable) extends ActorModelMessage + + val Ping = "Ping" val Pong = "Pong" @@ -52,17 +68,19 @@ object ActorModelSpec { } def receive = { - case Await(latch) ⇒ ack; latch.await(); busy.switchOff() - case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() - case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() - case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() - case Reply(msg) ⇒ ack; self.reply(msg); busy.switchOff() - case Reply_?(msg) ⇒ ack; self.reply_?(msg); busy.switchOff() - case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() - case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() - case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() + case Await(latch) ⇒ ack; latch.await(); busy.switchOff() + case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() + case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() + case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() + case Reply(msg) ⇒ ack; self.reply(msg); busy.switchOff() + case Reply_?(msg) ⇒ ack; self.reply_?(msg); busy.switchOff() + case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() + case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() + case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff() - case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") + case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") + case Interrupt => ack; busy.switchOff(); throw new InterruptedException("Ping!") + case ThrowException(e: Throwable) => ack; busy.switchOff(); throw e } } @@ -183,7 +201,9 @@ object ActorModelSpec { if (condition) return true Thread.sleep(intervalMs) - } catch { case e: InterruptedException ⇒ } + } catch { + case e: InterruptedException ⇒ + } } false } @@ -192,6 +212,7 @@ object ActorModelSpec { } abstract class ActorModelSpec extends JUnitSuite { + import ActorModelSpec._ protected def newInterceptedDispatcher: MessageDispatcherInterceptor @@ -215,13 +236,17 @@ abstract class ActorModelSpec extends JUnitSuite { msgsProcessed = 0, restarts = 0) - val futures = for (i ← 1 to 10) yield Future { i } + val futures = for (i ← 1 to 10) yield Future { + i + } await(dispatcher.stops.get == 2)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 2, stops = 2) val a2 = newTestActor a2.start - val futures2 = for (i ← 1 to 10) yield Future { i } + val futures2 = for (i ← 1 to 10) yield Future { + i + } await(dispatcher.starts.get == 3)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 3, stops = 2) @@ -259,7 +284,13 @@ abstract class ActorModelSpec extends JUnitSuite { val counter = new CountDownLatch(200) a.start() - for (i ← 1 to 10) { spawn { for (i ← 1 to 20) { a ! WaitAck(1, counter) } } } + for (i ← 1 to 10) { + spawn { + for (i ← 1 to 20) { + a ! WaitAck(1, counter) + } + } + } assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages") assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) @@ -267,7 +298,15 @@ abstract class ActorModelSpec extends JUnitSuite { } def spawn(f: ⇒ Unit) { - val thread = new Thread { override def run { try { f } catch { case e ⇒ e.printStackTrace } } } + val thread = new Thread { + override def run { + try { + f + } catch { + case e ⇒ e.printStackTrace + } + } + } thread.start() } @@ -329,8 +368,9 @@ abstract class ActorModelSpec extends JUnitSuite { def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) - (1 to num) foreach { _ ⇒ - newTestActor.start() ! cachedMessage + (1 to num) foreach { + _ ⇒ + newTestActor.start() ! cachedMessage } assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") } @@ -356,6 +396,52 @@ abstract class ActorModelSpec extends JUnitSuite { assert(each.exception.get.isInstanceOf[ActorKilledException]) a.stop() } + + @Test + def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor.start() + val f1 = a ? Reply("foo") + val f2 = a ? Reply("bar") + val f3 = a ? Interrupt + val f4 = a ? Reply("foo2") + val f5 = a ? Interrupt + val f6 = a ? Reply("bar2") + + assert(f1.get === "foo") + assert(f2.get === "bar") + assert((intercept[InterruptedException] { + f3.get + }).getMessage === "Ping!") + assert(f4.get === "foo2") + assert((intercept[InterruptedException] { + f5.get + }).getMessage === "Ping!") + assert(f6.get === "bar2") + } + + @Test + def dispatcherShouldContinueToProcessMessagesWhenExceptionIsThrown { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor.start() + val f1 = a ? Reply("foo") + val f2 = a ? Reply("bar") + val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException")) + val f4 = a ? Reply("foo2") + val f5 = a ? new ThrowException(new RemoteException("RemoteException")) + val f6 = a ? Reply("bar2") + + assert(f1.get === "foo") + assert(f2.get === "bar") + assert((intercept[IndexOutOfBoundsException] { + f3.get + }).getMessage === "IndexOutOfBoundsException") + assert(f4.get === "foo2") + assert((intercept[RemoteException] { + f5.get + }).getMessage === "RemoteException") + assert(f6.get === "bar2") + } } class DispatcherModelTest extends ActorModelSpec { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala index 6e1739b0bd..2f9ea89dd8 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala @@ -86,6 +86,7 @@ class FileBenchResultRepository extends BenchResultRepository { } private def save(stats: Stats) { + new File(dir).mkdirs if (!dirExists) return val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp)) val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser" @@ -98,8 +99,7 @@ class FileBenchResultRepository extends BenchResultRepository { case e: Exception ⇒ EventHandler.error(this, "Failed to save [%s] to [%s], due to [%s]". format(stats, f.getAbsolutePath, e.getMessage)) - } - finally { + } finally { if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ } } } @@ -117,8 +117,7 @@ class FileBenchResultRepository extends BenchResultRepository { EventHandler.error(this, "Failed to load from [%s], due to [%s]". format(f.getAbsolutePath, e.getMessage)) None - } - finally { + } finally { if (in ne null) try { in.close() } catch { case ignore: Exception ⇒ } } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala index ee06c33b5a..69a7b4bd08 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala @@ -52,8 +52,7 @@ trait PerformanceTest extends JUnitSuite { var stat: DescriptiveStatistics = _ val resultRepository = BenchResultRepository() - - val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") + lazy val report = new Report(resultRepository, compareResultWith) type TS <: TradingSystem @@ -128,95 +127,7 @@ trait PerformanceTest extends JUnitSuite { resultRepository.add(stats) - EventHandler.info(this, formatResultsTable(resultRepository.get(name))) - - percentilesChart(stats) - latencyAndThroughputChart(stats) - comparePercentilesChart(stats) - compareWithHistoricalPercentiliesChart(stats) - - } - - def percentilesChart(stats: Stats) { - val chartTitle = stats.name + " Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") - EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - } - - def comparePercentilesChart(stats: Stats) { - for { - compareName ← compareResultWith - compareStats ← resultRepository.get(compareName, stats.load) - } { - val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name) - EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - } - } - - def compareWithHistoricalPercentiliesChart(stats: Stats) { - val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load) - if (withHistorical.size > 1) { - val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle, - stats ⇒ legendTimeFormat.format(new Date(stats.timestamp))) - EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - } - } - - def latencyAndThroughputChart(stats: Stats) { - val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)" - val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle) - EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - } - - def formatResultsTable(statsSeq: Seq[Stats]): String = { - - val name = statsSeq.head.name - - val spaces = " " - val headerScenarioCol = ("Scenario" + spaces).take(name.length) - - val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil) - .mkString("\t") - val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil) - .mkString("\t") - val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString - val formattedStats = "\n" + - line.replace('-', '=') + "\n" + - headerLine + "\n" + - headerLine2 + "\n" + - line + "\n" + - statsSeq.map(formatStats(_)).mkString("\n") + "\n" + - line + "\n" - - formattedStats - - } - - def formatStats(stats: Stats): String = { - val durationS = stats.durationNanos.toDouble / 1000000000.0 - val duration = durationS.formatted("%.0f") - - val tpsStr = stats.tps.formatted("%.0f") - val meanStr = stats.mean.formatted("%.0f") - - val summaryLine = - stats.name :: - stats.load.toString :: - tpsStr :: - meanStr :: - stats.percentiles(5).toString :: - stats.percentiles(25).toString :: - stats.percentiles(50).toString :: - stats.percentiles(75).toString :: - stats.percentiles(95).toString :: - duration :: - stats.n.toString :: - Nil - - summaryLine.mkString("\t") - + report.html(resultRepository.get(name)) } def delay(delayMs: Int) { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala new file mode 100644 index 0000000000..9160fa631e --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala @@ -0,0 +1,179 @@ +package akka.performance.trading.common +import java.io.File +import java.text.SimpleDateFormat +import java.io.PrintWriter +import java.io.FileWriter +import akka.event.EventHandler +import java.util.Date + +class Report( + resultRepository: BenchResultRepository, + compareResultWith: Option[String] = None) { + + private val dir = System.getProperty("benchmark.resultDir", "target/benchmark") + + private def dirExists: Boolean = new File(dir).exists + private def log = System.getProperty("benchmark.logResult", "false").toBoolean + + val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") + val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") + val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss") + + def html(statistics: Seq[Stats]): Unit = if (dirExists) { + + val current = statistics.last + val sb = new StringBuilder + + val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp)) + sb.append(header(title)) + sb.append("

%s

\n".format(title)) + + sb.append("
\n")
+    sb.append(formatResultsTable(statistics))
+    sb.append("\n
\n") + + sb.append(img(percentilesChart(current))) + sb.append(img(latencyAndThroughputChart(current))) + + for (stats ← statistics) { + compareWithHistoricalPercentiliesChart(stats).foreach(url ⇒ sb.append(img(url))) + } + + for (stats ← statistics) { + comparePercentilesChart(stats).foreach(url ⇒ sb.append(img(url))) + } + + if (dirExists) { + val timestamp = fileTimestampFormat.format(new Date(current.timestamp)) + val name = current.name + "--" + timestamp + ".html" + write(sb.toString, name) + } + + } + + private def img(url: String): String = { + """""".format( + url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n" + } + + def percentilesChart(stats: Stats): String = { + val chartTitle = stats.name + " Percentiles (microseconds)" + val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") + if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) + chartUrl + } + + def comparePercentilesChart(stats: Stats): Seq[String] = { + for { + compareName ← compareResultWith.toSeq + compareStats ← resultRepository.get(compareName, stats.load) + } yield { + val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)" + val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name) + if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) + chartUrl + } + } + + def compareWithHistoricalPercentiliesChart(stats: Stats): Option[String] = { + val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load) + if (withHistorical.size > 1) { + val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)" + val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle, + stats ⇒ legendTimeFormat.format(new Date(stats.timestamp))) + if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) + Some(chartUrl) + } else { + None + } + } + + def latencyAndThroughputChart(stats: Stats): String = { + val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)" + val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle) + if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) + chartUrl + } + + def formatResultsTable(statsSeq: Seq[Stats]): String = { + + val name = statsSeq.head.name + + val spaces = " " + val headerScenarioCol = ("Scenario" + spaces).take(name.length) + + val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil) + .mkString("\t") + val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil) + .mkString("\t") + val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString + val formattedStats = "\n" + + line.replace('-', '=') + "\n" + + headerLine + "\n" + + headerLine2 + "\n" + + line + "\n" + + statsSeq.map(formatStats(_)).mkString("\n") + "\n" + + line + "\n" + + if (log) EventHandler.info(this, formattedStats) + + formattedStats + + } + + def formatStats(stats: Stats): String = { + val durationS = stats.durationNanos.toDouble / 1000000000.0 + val duration = durationS.formatted("%.0f") + + val tpsStr = stats.tps.formatted("%.0f") + val meanStr = stats.mean.formatted("%.0f") + + val summaryLine = + stats.name :: + stats.load.toString :: + tpsStr :: + meanStr :: + stats.percentiles(5).toString :: + stats.percentiles(25).toString :: + stats.percentiles(50).toString :: + stats.percentiles(75).toString :: + stats.percentiles(95).toString :: + duration :: + stats.n.toString :: + Nil + + summaryLine.mkString("\t") + + } + + def write(content: String, fileName: String) { + val f = new File(dir, fileName) + var writer: PrintWriter = null + try { + writer = new PrintWriter(new FileWriter(f)) + writer.print(content) + writer.flush() + } catch { + case e: Exception ⇒ + EventHandler.error(this, "Failed to save report to [%s], due to [%s]". + format(f.getAbsolutePath, e.getMessage)) + } finally { + if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } + } + } + + def header(title: String) = + """| + | + | + | + |%s + | + | + |""".stripMargin.format(title) + + def footer = + """|" + |""".stripMargin + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index f6d3b9de6f..9fbc5fd7ac 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -5,6 +5,7 @@ package akka.testkit import akka.actor.dispatch.ActorModelSpec import java.util.concurrent.CountDownLatch +import org.junit.{After, Test} class CallingThreadDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ @@ -42,6 +43,13 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec { //Can't handle this... } + + @After + def after { + //remove the interrupted status since we are messing with interrupted exceptions. + Thread.interrupted() + } + } // vim: set ts=2 sw=2 et: diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 542bece2ea..f87453db10 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -6,7 +6,7 @@ package akka import akka.actor.newUuid import java.net.{ InetAddress, UnknownHostException } - + /** * Akka base Exception. Each Exception gets: *