diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala index 6cbd6ee4ca..5442deacd5 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchmarkScenarios.scala @@ -29,12 +29,14 @@ trait BenchmarkScenarios extends PerformanceTest { def complexScenario80 = complexScenario(80) @Test def complexScenario100 = complexScenario(100) + /* @Test def complexScenario200 = complexScenario(200) @Test def complexScenario300 = complexScenario(300) @Test def complexScenario400 = complexScenario(400) + */ def complexScenario(numberOfClients: Int) { Assume.assumeTrue(numberOfClients >= minClients) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala index e9162299d1..869c186524 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala @@ -7,11 +7,13 @@ import akka.event.EventHandler trait OrderReceiver { type ME - val matchingEngines: List[ME] - var matchingEnginePartitionsIsStale = true var matchingEngineForOrderbook: Map[String, ME] = Map() - def refreshMatchingEnginePartitions() { + def refreshMatchingEnginePartitions(routing: MatchingEngineRouting[ME]) { + + val matchingEngines: List[ME] = routing.mapping.keys.toList + def supportedOrderbooks(me: ME): List[String] = routing.mapping(me) + val m = Map() ++ (for { me ← matchingEngines @@ -19,14 +21,11 @@ trait OrderReceiver { } yield (orderbookSymbol, me)) matchingEngineForOrderbook = m - matchingEnginePartitionsIsStale = false } - def supportedOrderbooks(me: ME): List[String] - } -class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp: Option[MessageDispatcher]) +class AkkaOrderReceiver(disp: Option[MessageDispatcher]) extends Actor with OrderReceiver { type ME = ActorRef @@ -34,21 +33,13 @@ class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp self.dispatcher = d } - override val matchingEngines: List[ActorRef] = matchingEngineRouting.keys.toList - - override def preStart() { - refreshMatchingEnginePartitions() - } - def receive = { + case routing@MatchingEngineRouting(mapping) ⇒ + refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]]) case order: Order ⇒ placeOrder(order) case unknown ⇒ EventHandler.warning(this, "Received unknown message: " + unknown) } - override def supportedOrderbooks(me: ActorRef): List[String] = { - matchingEngineRouting(me) - } - def placeOrder(order: Order) = { val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol) matchingEngine match { @@ -60,3 +51,5 @@ class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp } } } + +case class MatchingEngineRouting[ME](mapping: Map[ME, List[String]]) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala index 69a7b4bd08..3c160a09ab 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala @@ -1,7 +1,5 @@ package akka.performance.trading.common -import java.text.SimpleDateFormat -import java.util.Date import java.util.Random import scala.collection.immutable.TreeMap @@ -12,11 +10,13 @@ import org.junit.After import org.junit.Before import org.scalatest.junit.JUnitSuite -import akka.event.EventHandler import akka.performance.trading.domain.Ask import akka.performance.trading.domain.Bid import akka.performance.trading.domain.Order import akka.performance.trading.domain.TotalTradeCounter +import akka.performance.workbench.BenchResultRepository +import akka.performance.workbench.Report +import akka.performance.workbench.Stats trait PerformanceTest extends JUnitSuite { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala deleted file mode 100644 index 9160fa631e..0000000000 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Report.scala +++ /dev/null @@ -1,179 +0,0 @@ -package akka.performance.trading.common -import java.io.File -import java.text.SimpleDateFormat -import java.io.PrintWriter -import java.io.FileWriter -import akka.event.EventHandler -import java.util.Date - -class Report( - resultRepository: BenchResultRepository, - compareResultWith: Option[String] = None) { - - private val dir = System.getProperty("benchmark.resultDir", "target/benchmark") - - private def dirExists: Boolean = new File(dir).exists - private def log = System.getProperty("benchmark.logResult", "false").toBoolean - - val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") - val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") - val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss") - - def html(statistics: Seq[Stats]): Unit = if (dirExists) { - - val current = statistics.last - val sb = new StringBuilder - - val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp)) - sb.append(header(title)) - sb.append("

%s

\n".format(title)) - - sb.append("
\n")
-    sb.append(formatResultsTable(statistics))
-    sb.append("\n
\n") - - sb.append(img(percentilesChart(current))) - sb.append(img(latencyAndThroughputChart(current))) - - for (stats ← statistics) { - compareWithHistoricalPercentiliesChart(stats).foreach(url ⇒ sb.append(img(url))) - } - - for (stats ← statistics) { - comparePercentilesChart(stats).foreach(url ⇒ sb.append(img(url))) - } - - if (dirExists) { - val timestamp = fileTimestampFormat.format(new Date(current.timestamp)) - val name = current.name + "--" + timestamp + ".html" - write(sb.toString, name) - } - - } - - private def img(url: String): String = { - """""".format( - url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n" - } - - def percentilesChart(stats: Stats): String = { - val chartTitle = stats.name + " Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") - if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - chartUrl - } - - def comparePercentilesChart(stats: Stats): Seq[String] = { - for { - compareName ← compareResultWith.toSeq - compareStats ← resultRepository.get(compareName, stats.load) - } yield { - val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name) - if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - chartUrl - } - } - - def compareWithHistoricalPercentiliesChart(stats: Stats): Option[String] = { - val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load) - if (withHistorical.size > 1) { - val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)" - val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle, - stats ⇒ legendTimeFormat.format(new Date(stats.timestamp))) - if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - Some(chartUrl) - } else { - None - } - } - - def latencyAndThroughputChart(stats: Stats): String = { - val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)" - val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle) - if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl) - chartUrl - } - - def formatResultsTable(statsSeq: Seq[Stats]): String = { - - val name = statsSeq.head.name - - val spaces = " " - val headerScenarioCol = ("Scenario" + spaces).take(name.length) - - val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil) - .mkString("\t") - val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil) - .mkString("\t") - val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString - val formattedStats = "\n" + - line.replace('-', '=') + "\n" + - headerLine + "\n" + - headerLine2 + "\n" + - line + "\n" + - statsSeq.map(formatStats(_)).mkString("\n") + "\n" + - line + "\n" - - if (log) EventHandler.info(this, formattedStats) - - formattedStats - - } - - def formatStats(stats: Stats): String = { - val durationS = stats.durationNanos.toDouble / 1000000000.0 - val duration = durationS.formatted("%.0f") - - val tpsStr = stats.tps.formatted("%.0f") - val meanStr = stats.mean.formatted("%.0f") - - val summaryLine = - stats.name :: - stats.load.toString :: - tpsStr :: - meanStr :: - stats.percentiles(5).toString :: - stats.percentiles(25).toString :: - stats.percentiles(50).toString :: - stats.percentiles(75).toString :: - stats.percentiles(95).toString :: - duration :: - stats.n.toString :: - Nil - - summaryLine.mkString("\t") - - } - - def write(content: String, fileName: String) { - val f = new File(dir, fileName) - var writer: PrintWriter = null - try { - writer = new PrintWriter(new FileWriter(f)) - writer.print(content) - writer.flush() - } catch { - case e: Exception ⇒ - EventHandler.error(this, "Failed to save report to [%s], due to [%s]". - format(f.getAbsolutePath, e.getMessage)) - } finally { - if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } - } - } - - def header(title: String) = - """| - | - | - | - |%s - | - | - |""".stripMargin.format(title) - - def footer = - """|" - |""".stripMargin - -} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala index 44951879c5..ed822cf1be 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/TradingSystem.scala @@ -75,7 +75,7 @@ class AkkaTradingSystem extends TradingSystem { (1 to 10).toList map (i ⇒ createOrderReceiver()) } - def matchingEngineRouting: Map[ActorRef, List[String]] = { + def matchingEngineRouting: MatchingEngineRouting[ActorRef] = { val rules = for { info ← matchingEngines @@ -84,11 +84,11 @@ class AkkaTradingSystem extends TradingSystem { (info.primary, orderbookSymbols) } - Map() ++ rules + MatchingEngineRouting(Map() ++ rules) } def createOrderReceiver() = - actorOf(new AkkaOrderReceiver(matchingEngineRouting, orDispatcher)) + actorOf(new AkkaOrderReceiver(orDispatcher)) override def start() { for (MatchingEngineInfo(p, s, o) ← matchingEngines) { @@ -97,7 +97,11 @@ class AkkaTradingSystem extends TradingSystem { s.foreach(_.start()) s.foreach(p ! _) } - orderReceivers.foreach(_.start()) + val routing = matchingEngineRouting + for (or ← orderReceivers) { + or.start() + or ! routing + } } override def shutdown() { diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala index d64639d3fa..16e3a41048 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala @@ -6,8 +6,8 @@ import akka.event.EventHandler import akka.performance.trading.domain._ import akka.performance.trading.common.AkkaOrderReceiver -class OneWayOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp: Option[MessageDispatcher]) - extends AkkaOrderReceiver(matchingEngineRouting, disp) { +class OneWayOrderReceiver(disp: Option[MessageDispatcher]) + extends AkkaOrderReceiver(disp) { override def placeOrder(order: Order) = { val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol) diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala index d6fcafbf7c..57737e9c0e 100755 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayTradingSystem.scala @@ -11,6 +11,6 @@ class OneWayTradingSystem extends AkkaTradingSystem { actorOf(new OneWayMatchingEngine(meId, orderbooks, meDispatcher)) override def createOrderReceiver() = - actorOf(new OneWayOrderReceiver(matchingEngineRouting, orDispatcher)) + actorOf(new OneWayOrderReceiver(orDispatcher)) } \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala similarity index 72% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala rename to akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala index 2f9ea89dd8..0c8e5f0cb2 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala @@ -1,16 +1,18 @@ -package akka.performance.trading.common +package akka.performance.workbench import java.io.BufferedInputStream import java.io.BufferedOutputStream import java.io.File import java.io.FileInputStream import java.io.FileOutputStream +import java.io.FileWriter import java.io.ObjectInputStream import java.io.ObjectOutputStream +import java.io.PrintWriter import java.text.SimpleDateFormat import java.util.Date -import scala.collection.mutable.{ Map ⇒ MutableMap } +import scala.collection.mutable.{Map => MutableMap} import akka.event.EventHandler @@ -23,6 +25,10 @@ trait BenchResultRepository { def getWithHistorical(name: String, load: Int): Seq[Stats] + def saveHtmlReport(content: String, name: String): Unit + + def htmlReportUrl(name: String): String + } object BenchResultRepository { @@ -34,8 +40,10 @@ class FileBenchResultRepository extends BenchResultRepository { private val statsByName = MutableMap[String, Seq[Stats]]() private val baselineStats = MutableMap[Key, Stats]() private val historicalStats = MutableMap[Key, Seq[Stats]]() - private val dir = System.getProperty("benchmark.resultDir", "target/benchmark") - private def dirExists: Boolean = new File(dir).exists + private val serDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/ser" + private def serDirExists: Boolean = new File(serDir).exists + private val htmlDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/html" + private def htmlDirExists: Boolean = new File(htmlDir).exists protected val maxHistorical = 7 case class Key(name: String, load: Int) @@ -64,10 +72,10 @@ class FileBenchResultRepository extends BenchResultRepository { } private def loadFiles() { - if (dirExists) { + if (serDirExists) { val files = for { - f ← new File(dir).listFiles + f ← new File(serDir).listFiles if f.isFile if f.getName.endsWith(".ser") } yield f @@ -86,11 +94,11 @@ class FileBenchResultRepository extends BenchResultRepository { } private def save(stats: Stats) { - new File(dir).mkdirs - if (!dirExists) return + new File(serDir).mkdirs + if (!serDirExists) return val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp)) val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser" - val f = new File(dir, name) + val f = new File(serDir, name) var out: ObjectOutputStream = null try { out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f))) @@ -127,5 +135,24 @@ class FileBenchResultRepository extends BenchResultRepository { loadFiles() + def saveHtmlReport(content: String, fileName: String) { + new File(htmlDir).mkdirs + if (!htmlDirExists) return + val f = new File(htmlDir, fileName) + var writer: PrintWriter = null + try { + writer = new PrintWriter(new FileWriter(f)) + writer.print(content) + writer.flush() + } catch { + case e: Exception ⇒ + EventHandler.error(this, "Failed to save report to [%s], due to [%s]". + format(f.getAbsolutePath, e.getMessage)) + } finally { + if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } + } + } + + def htmlReportUrl(fileName: String): String = new File(htmlDir, fileName).getAbsolutePath } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala similarity index 85% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala rename to akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala index d7f6c965a3..a2d2a381a8 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/GoogleChartBuilder.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala @@ -1,10 +1,9 @@ -package akka.performance.trading.common +package akka.performance.workbench import java.io.UnsupportedEncodingException import java.net.URLEncoder + import scala.collection.immutable.TreeMap -import java.util.Locale -import java.util.Formatter /** * Generates URLs to Google Chart API http://code.google.com/apis/chart/ @@ -15,9 +14,9 @@ object GoogleChartBuilder { val ChartHeight = 400 /** - * Builds a bar chart for all percentiles in the statistics. + * Builds a bar chart for all percentiles and the mean in the statistics. */ - def percentilChartUrl(statistics: Seq[Stats], title: String, legend: Stats ⇒ String): String = { + def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: Stats ⇒ String): String = { if (statistics.isEmpty) return "" val current = statistics.last @@ -38,6 +37,7 @@ object GoogleChartBuilder { sb.append("&") // labels percentileLabels(current.percentiles, sb) + sb.append("|mean") sb.append("|2:|min|mean|median") sb.append("&") // label positions @@ -63,7 +63,7 @@ object GoogleChartBuilder { // data series val maxValue = statistics.map(_.percentiles.last._2).max sb.append("chd=t:") - dataSeries(statistics.map(_.percentiles), sb) + dataSeries(statistics.map(_.percentiles), statistics.map(_.mean), sb) // y range sb.append("&") @@ -98,13 +98,18 @@ object GoogleChartBuilder { sb.append(s) } - private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], sb: StringBuilder) { - val series = + private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], meanValues: Seq[Double], sb: StringBuilder) { + val percentileSeries = for { percentiles ← allPercentiles } yield { percentiles.values.mkString(",") } + + val series = + for ((s, m) ← percentileSeries.zip(meanValues)) + yield s + "," + formatDouble(m) + sb.append(series.mkString("|")) } @@ -144,11 +149,11 @@ object GoogleChartBuilder { sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767") sb.append("&") sb.append("chco=") - val seriesColors = List("25B33B", "3072F3", "FF0000", "FF9900") + val seriesColors = List("25B33B", "3072F3", "FF0000", "37F0ED", "FF9900") sb.append(seriesColors.mkString(",")) sb.append("&") // legend - sb.append("chdl=5th Percentile|Median|95th Percentile|Throughput") + sb.append("chdl=5th%20Percentile|Median|95th%20Percentile|Mean|Throughput") sb.append("&") sb.append("chdlp=b") @@ -160,6 +165,7 @@ object GoogleChartBuilder { sb.append("chls=1|1|1") sb.append("&") + // margins sb.append("chma=5,5,5,25") sb.append("&") @@ -181,6 +187,11 @@ object GoogleChartBuilder { } sb.append(percentileSeries.mkString("|")) + sb.append("|") + sb.append(loadStr).append("|") + val meanSeries = statistics.map(s ⇒ formatDouble(s.mean)).mkString(",") + sb.append(meanSeries) + sb.append("|") val maxTps: Double = statistics.map(_.tps).max sb.append(loadStr).append("|") @@ -192,7 +203,7 @@ object GoogleChartBuilder { // y range sb.append("&") - sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append("|1,0,").append(maxValue).append("|2,0,") + sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append(",4").append("|1,0,").append(maxValue).append("|2,0,") .append(formatDouble(maxTps)) sb.append("&") @@ -203,6 +214,9 @@ object GoogleChartBuilder { sb.append(",") } sb.append(minLoad).append(",").append(maxLoad) + sb.append(",0,").append(formatDouble(maxValue)) + sb.append(",") + sb.append(minLoad).append(",").append(maxLoad) sb.append(",0,").append(formatDouble(maxTps)) sb.append("&") diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala new file mode 100644 index 0000000000..dce6017203 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -0,0 +1,220 @@ +package akka.performance.workbench + +import java.lang.management.ManagementFactory +import java.text.SimpleDateFormat +import java.util.Date + +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.JavaConversions.enumerationAsScalaIterator + +import akka.event.EventHandler +import akka.config.Config +import akka.config.Config.config + +class Report( + resultRepository: BenchResultRepository, + compareResultWith: Option[String] = None) { + + private def log = System.getProperty("benchmark.logResult", "true").toBoolean + + val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") + val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") + val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss") + + def html(statistics: Seq[Stats]): Unit = { + + val current = statistics.last + val sb = new StringBuilder + + val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp)) + sb.append(header(title)) + sb.append("

%s

\n".format(title)) + + sb.append("
\n")
+    val resultTable = formatResultsTable(statistics)
+    sb.append(resultTable)
+    sb.append("\n
\n") + + sb.append(img(percentilesAndMeanChart(current))) + sb.append(img(latencyAndThroughputChart(current))) + + for (stats ← statistics) { + compareWithHistoricalPercentiliesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) + } + + for (stats ← statistics) { + comparePercentilesAndMeanChart(stats).foreach(url ⇒ sb.append(img(url))) + } + + sb.append("
\n") + sb.append("
\n")
+    sb.append(systemInformation)
+    sb.append("\n
\n") + + val timestamp = fileTimestampFormat.format(new Date(current.timestamp)) + val reportName = current.name + "--" + timestamp + ".html" + resultRepository.saveHtmlReport(sb.toString, reportName) + + if (log) { + EventHandler.info(this, resultTable + "Charts in html report: " + resultRepository.htmlReportUrl(reportName)) + } + + } + + def img(url: String): String = { + """""".format( + url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n" + } + + def percentilesAndMeanChart(stats: Stats): String = { + val chartTitle = stats.name + " Percentiles and Mean (microseconds)" + val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients") + chartUrl + } + + def comparePercentilesAndMeanChart(stats: Stats): Seq[String] = { + for { + compareName ← compareResultWith.toSeq + compareStats ← resultRepository.get(compareName, stats.load) + } yield { + val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles and Mean (microseconds)" + val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(Seq(compareStats, stats), chartTitle, _.name) + chartUrl + } + } + + def compareWithHistoricalPercentiliesAndMeanChart(stats: Stats): Option[String] = { + val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load) + if (withHistorical.size > 1) { + val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles and Mean (microseconds)" + val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(withHistorical, chartTitle, + stats ⇒ legendTimeFormat.format(new Date(stats.timestamp))) + Some(chartUrl) + } else { + None + } + } + + def latencyAndThroughputChart(stats: Stats): String = { + val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)" + val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle) + chartUrl + } + + def formatResultsTable(statsSeq: Seq[Stats]): String = { + + val name = statsSeq.head.name + + val spaces = " " + val headerScenarioCol = ("Scenario" + spaces).take(name.length) + + val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil) + .mkString("\t") + val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil) + .mkString("\t") + val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString + val formattedStats = "\n" + + line.replace('-', '=') + "\n" + + headerLine + "\n" + + headerLine2 + "\n" + + line + "\n" + + statsSeq.map(formatStats(_)).mkString("\n") + "\n" + + line + "\n" + + formattedStats + + } + + def formatStats(stats: Stats): String = { + val durationS = stats.durationNanos.toDouble / 1000000000.0 + val duration = durationS.formatted("%.0f") + + val tpsStr = stats.tps.formatted("%.0f") + val meanStr = stats.mean.formatted("%.0f") + + val summaryLine = + stats.name :: + stats.load.toString :: + tpsStr :: + meanStr :: + stats.percentiles(5).toString :: + stats.percentiles(25).toString :: + stats.percentiles(50).toString :: + stats.percentiles(75).toString :: + stats.percentiles(95).toString :: + duration :: + stats.n.toString :: + Nil + + summaryLine.mkString("\t") + + } + + def systemInformation: String = { + val runtime = ManagementFactory.getRuntimeMXBean + val os = ManagementFactory.getOperatingSystemMXBean + val threads = ManagementFactory.getThreadMXBean + val mem = ManagementFactory.getMemoryMXBean + val heap = mem.getHeapMemoryUsage + + val sb = new StringBuilder + + sb.append("Benchmark properties:") + import scala.collection.JavaConversions._ + val propNames: Seq[String] = System.getProperties.propertyNames.toSeq.map(_.toString) + for (name ← propNames if name.startsWith("benchmark")) { + sb.append("\n ").append(name).append("=").append(System.getProperty(name)) + } + sb.append("\n") + + sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion) + sb.append("\n") + sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor). + append(" ").append(runtime.getVmVersion) + sb.append("\n") + sb.append("Processors: ").append(os.getAvailableProcessors) + sb.append("\n") + sb.append("Load average: ").append(os.getSystemLoadAverage) + sb.append("\n") + sb.append("Thread count: ").append(threads.getThreadCount).append(" (").append(threads.getPeakThreadCount).append(")") + sb.append("\n") + sb.append("Heap: ").append(formatDouble(heap.getUsed.toDouble / 1024 / 1024)). + append(" (").append(formatDouble(heap.getInit.toDouble / 1024 / 1024)). + append(" - "). + append(formatDouble(heap.getMax.toDouble / 1024 / 1024)). + append(")").append(" MB") + sb.append("\n") + + val args = runtime.getInputArguments.filterNot(_.contains("classpath")).mkString("\n ") + sb.append("Args:\n ").append(args) + sb.append("\n") + + sb.append("Akka version: ").append(Config.CONFIG_VERSION) + sb.append("\n") + sb.append("Akka config:") + for (key ← config.keys) { + sb.append("\n ").append(key).append("=").append(config(key)) + } + + sb.toString + } + + def formatDouble(value: Double): String = { + new java.math.BigDecimal(value).setScale(2, java.math.RoundingMode.HALF_EVEN).toString + } + + def header(title: String) = + """| + | + | + | + |%s + | + | + |""".stripMargin.format(title) + + def footer = + """|" + |""".stripMargin + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Stats.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala similarity index 84% rename from akka-actor-tests/src/test/scala/akka/performance/trading/common/Stats.scala rename to akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala index 1b1b854cb0..c307d997e3 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/Stats.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Stats.scala @@ -1,7 +1,8 @@ -package akka.performance.trading.common +package akka.performance.workbench import scala.collection.immutable.TreeMap +@SerialVersionUID(1L) case class Stats( name: String, load: Int, diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 0a0e00e2cc..4b1fc2e1aa 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -8,7 +8,7 @@ import akka.actor._ import akka.dispatch.Dispatchers import akka.config.Config._ import akka.config.ConfigurationException -import akka.util.{ ListenerManagement, ReflectiveAccess } +import akka.util.{ListenerManagement, ReflectiveAccess} import akka.serialization._ import akka.AkkaException @@ -26,7 +26,7 @@ import akka.AkkaException * case EventHandler.Info(instance, message) ⇒ ... * case EventHandler.Debug(instance, message) ⇒ ... * case genericEvent ⇒ ... - * } + * } * }) * * EventHandler.addListener(eventHandlerListener) @@ -95,10 +95,10 @@ object EventHandler extends ListenerManagement { @volatile var level: Int = config.getString("akka.event-handler-level", "INFO") match { - case "ERROR" ⇒ ErrorLevel + case "ERROR" ⇒ ErrorLevel case "WARNING" ⇒ WarningLevel - case "INFO" ⇒ InfoLevel - case "DEBUG" ⇒ DebugLevel + case "INFO" ⇒ InfoLevel + case "DEBUG" ⇒ DebugLevel case unknown ⇒ throw new ConfigurationException( "Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]") } @@ -106,21 +106,22 @@ object EventHandler extends ListenerManagement { def start() { try { val defaultListeners = config.getList("akka.event-handlers") match { - case Nil ⇒ "akka.event.EventHandler$DefaultListener" :: Nil + case Nil ⇒ "akka.event.EventHandler$DefaultListener" :: Nil case listeners ⇒ listeners } - defaultListeners foreach { listenerName ⇒ - try { - ReflectiveAccess.getClassFor[Actor](listenerName) match { - case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start()) - case Left(exception) ⇒ throw exception + defaultListeners foreach { + listenerName ⇒ + try { + ReflectiveAccess.getClassFor[Actor](listenerName) match { + case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start()) + case Left(exception) ⇒ throw exception + } + } catch { + case e: Exception ⇒ + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + listenerName + + "] due to [" + e.toString + "]", e) } - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + listenerName + - "] due to [" + e.toString + "]", e) - } } info(this, "Starting up EventHandler") } catch { @@ -145,7 +146,7 @@ object EventHandler extends ListenerManagement { notifyListeners(event) } - def notify[T <: Event: ClassManifest](event: ⇒ T) { + def notify[T <: Event : ClassManifest](event: ⇒ T) { if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event) } @@ -181,6 +182,7 @@ object EventHandler extends ListenerManagement { if (level >= InfoLevel) notifyListeners(Info(instance, message)) } + def debug(instance: AnyRef, message: ⇒ String) { if (level >= DebugLevel) notifyListeners(Debug(instance, message)) } @@ -194,7 +196,7 @@ object EventHandler extends ListenerManagement { def isDebugEnabled = level >= DebugLevel def stackTraceFor(e: Throwable) = { - import java.io.{ StringWriter, PrintWriter } + import java.io.{StringWriter, PrintWriter} val sw = new StringWriter val pw = new PrintWriter(sw) e.printStackTrace(pw) @@ -210,6 +212,7 @@ object EventHandler extends ListenerManagement { } class DefaultListener extends Actor { + import java.text.SimpleDateFormat import java.util.Date diff --git a/akka-cluster/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-cluster/src/main/scala/akka/remote/BootableRemoteActorService.scala index 95492a30f5..f214e12f52 100644 --- a/akka-cluster/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-cluster/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -6,6 +6,7 @@ package akka.remote import akka.actor.{ Actor, BootableActorLoaderService } import akka.util.{ ReflectiveAccess, Bootable } +import akka.event.EventHandler /** * This bundle/service is responsible for booting up and shutting down the remote actors facility. @@ -23,14 +24,19 @@ trait BootableRemoteActorService extends Bootable { abstract override def onLoad() { if (ReflectiveAccess.ClusterModule.isEnabled && RemoteServerSettings.isRemotingEnabled) { + EventHandler.info(this, "Initializing Remote Actors Service...") startRemoteService() + EventHandler.info(this, "Remote Actors Service initialized") } super.onLoad() } abstract override def onUnload() { + EventHandler.info(this, "Shutting down Remote Actors Service") + Actor.remote.shutdown() if (remoteServerThread.isAlive) remoteServerThread.join(1000) + EventHandler.info(this, "Remote Actors Service has been shut down") super.onUnload() } } diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 36e9546d1e..863e2b4917 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,24 +4,22 @@ package akka.remote.netty -import akka.dispatch.{ ActorPromise, DefaultPromise, Promise, Future } -import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings } +import akka.dispatch.{ActorPromise, DefaultPromise, Promise} +import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import akka.remote.protocol.RemoteProtocol._ import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization._ import akka.remoteinterface._ import akka.actor.{ - PoisonPill, - LocalActorRef, - Actor, - RemoteActorRef, - ActorRef, - IllegalActorStateException, - RemoteActorSystemMessage, - uuidFrom, - Uuid, - Death, - LifeCycleMessage +PoisonPill, +Actor, +RemoteActorRef, +ActorRef, +IllegalActorStateException, +RemoteActorSystemMessage, +uuidFrom, +Uuid, +LifeCycleMessage } import akka.actor.Actor._ import akka.config.Config @@ -30,23 +28,22 @@ import akka.util._ import akka.event.EventHandler import org.jboss.netty.channel._ -import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture } +import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup, ChannelGroupFuture} import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap } -import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } -import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } -import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } -import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } -import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } +import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap} +import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} +import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder} +import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} +import org.jboss.netty.handler.timeout.{ReadTimeoutHandler, ReadTimeoutException} +import org.jboss.netty.handler.execution.{OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler} +import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer} import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ import java.net.InetSocketAddress -import java.lang.reflect.InvocationTargetException -import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } +import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} import java.util.concurrent._ import akka.AkkaException @@ -66,7 +63,8 @@ object RemoteEncoder { } } -trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement ⇒ +trait NettyRemoteClientModule extends RemoteClientModule { + self: ListenerManagement ⇒ private val remoteClients = new HashMap[Address, RemoteClient] private val remoteActors = new Index[Address, Uuid] private val lock = new ReadWriteGuard @@ -82,7 +80,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef)) private[akka] def withClientFor[T]( - address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient ⇒ T): T = { + address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient ⇒ T): T = { // loader.foreach(MessageSerializer.setClassLoader(_)) val key = Address(address) lock.readLock.lock @@ -94,7 +92,8 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem lock.writeLock.lock //Lock upgrade, not supported natively try { try { - remoteClients.get(key) match { //Recheck for addition, race between upgrades + remoteClients.get(key) match { + //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _) @@ -102,24 +101,30 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem remoteClients += key -> client client } - } finally { lock.readLock.lock } //downgrade - } finally { lock.writeLock.unlock } + } finally { + lock.readLock.lock + } //downgrade + } finally { + lock.writeLock.unlock + } } fun(c) - } finally { lock.readLock.unlock } + } finally { + lock.readLock.unlock + } } def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { remoteClients.remove(Address(address)) match { case Some(client) ⇒ client.shutdown() - case None ⇒ false + case None ⇒ false } } def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard { remoteClients.get(Address(address)) match { case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true) - case None ⇒ false + case None ⇒ false } } @@ -133,7 +138,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem } def shutdownRemoteClients() = lock withWriteGuard { - remoteClients.foreach({ case (addr, client) ⇒ client.shutdown() }) + remoteClients.foreach({ + case (addr, client) ⇒ client.shutdown() + }) remoteClients.clear() } } @@ -143,9 +150,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem * ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that * reuses an already established connection. */ -abstract class RemoteClient private[akka] ( - val module: NettyRemoteClientModule, - val remoteAddress: InetSocketAddress) { +abstract class RemoteClient private[akka]( + val module: NettyRemoteClientModule, + val remoteAddress: InetSocketAddress) { val useTransactionLog = config.getBool("akka.cluster.client.buffering.retry-message-send-on-failure", true) val transactionLogCapacity = config.getInt("akka.cluster.client.buffering.capacity", -1) @@ -189,13 +196,13 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send[T]( - message: Any, - senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]], - remoteAddress: InetSocketAddress, - timeout: Long, - isOneWay: Boolean, - actorRef: ActorRef): Option[Promise[T]] = + message: Any, + senderOption: Option[ActorRef], + senderFuture: Option[Promise[T]], + remoteAddress: InetSocketAddress, + timeout: Long, + isOneWay: Boolean, + actorRef: ActorRef): Option[Promise[T]] = send(createRemoteMessageProtocolBuilder( Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build, senderFuture) @@ -204,8 +211,8 @@ abstract class RemoteClient private[akka] ( * Sends the message across the wire */ def send[T]( - request: RemoteMessageProtocol, - senderFuture: Option[Promise[T]]): Option[Promise[T]] = { + request: RemoteMessageProtocol, + senderFuture: Option[Promise[T]]): Option[Promise[T]] = { if (isRunning) { EventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request)) @@ -266,20 +273,23 @@ abstract class RemoteClient private[akka] ( } } - private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log + private[remote] def sendPendingRequests() = pendingRequests synchronized { + // ensure only one thread at a time can flush the log val nrOfMessages = pendingRequests.size if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages) var pendingRequest = pendingRequests.peek while (pendingRequest ne null) { val (isOneWay, futureUuid, message) = pendingRequest - if (isOneWay) { // sendOneWay + if (isOneWay) { + // sendOneWay val future = currentChannel.write(RemoteEncoder.encode(message)) future.awaitUninterruptibly() if (!future.isCancelled && !future.isSuccess) { notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress)) throw future.getCause } - } else { // sendRequestReply + } else { + // sendRequestReply val future = currentChannel.write(RemoteEncoder.encode(message)) future.awaitUninterruptibly() if (future.isCancelled) futures.remove(futureUuid) // Clean up future @@ -300,9 +310,10 @@ abstract class RemoteClient private[akka] ( * * @author Jonas Bonér */ -class ActiveRemoteClient private[akka] ( - module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, - val loader: Option[ClassLoader] = None, notifyListenersFun: (⇒ Any) ⇒ Unit) extends RemoteClient(module, remoteAddress) { +class ActiveRemoteClient private[akka]( + module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, + val loader: Option[ClassLoader] = None, notifyListenersFun: (⇒ Any) ⇒ Unit) extends RemoteClient(module, remoteAddress) { + import RemoteClientSettings._ //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @@ -318,6 +329,7 @@ class ActiveRemoteClient private[akka] ( private var reconnectionTimeWindowStart = 0L def notifyListeners(msg: ⇒ Any): Unit = notifyListenersFun(msg) + def currentChannel = connection.getChannel def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = { @@ -330,15 +342,18 @@ class ActiveRemoteClient private[akka] ( bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) + EventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress)) + + // Wait until the connection attempt succeeds or fails. connection = bootstrap.connect(remoteAddress) openChannels.add(connection.awaitUninterruptibly.getChannel) if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) + EventHandler.error(connection.getCause, "Remote client connection to [%s] has failed".format(remoteAddress), this) false } else { - //Send cookie val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SECURE_COOKIE.nonEmpty) @@ -365,12 +380,16 @@ class ActiveRemoteClient private[akka] ( } match { case true ⇒ true case false if reconnectIfAlreadyConnected ⇒ + EventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress)) + openChannels.remove(connection.getChannel) connection.getChannel.close connection = bootstrap.connect(remoteAddress) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) + EventHandler.error(connection.getCause, "Reconnection to [%s] has failed".format(remoteAddress),this) + false } else { //Send cookie @@ -387,6 +406,8 @@ class ActiveRemoteClient private[akka] ( //Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients def shutdown() = runSwitch switchOff { + EventHandler.info(this, "Shutting down [%s]".format(name)) + notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop() timer = null @@ -396,6 +417,8 @@ class ActiveRemoteClient private[akka] ( bootstrap = null connection = null pendingRequests.clear() + + EventHandler.info(this, "[%s] has been shut down".format(name)) } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -403,7 +426,11 @@ class ActiveRemoteClient private[akka] ( reconnectionTimeWindowStart = System.currentTimeMillis true } else { - /*Time left > 0*/ (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 + val timeLeft = (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 + if (timeLeft) { + EventHandler.info(this, "Will try to reconnect to remote server for another [%s] milliseconds".format(timeLeft)) + } + timeLeft } } @@ -414,12 +441,12 @@ class ActiveRemoteClient private[akka] ( * @author Jonas Bonér */ class ActiveRemoteClientPipelineFactory( - name: String, - futures: ConcurrentMap[Uuid, Promise[_]], - bootstrap: ClientBootstrap, - remoteAddress: InetSocketAddress, - timer: HashedWheelTimer, - client: ActiveRemoteClient) extends ChannelPipelineFactory { + name: String, + futures: ConcurrentMap[Uuid, Promise[_]], + bootstrap: ClientBootstrap, + remoteAddress: InetSocketAddress, + timer: HashedWheelTimer, + client: ActiveRemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit) @@ -429,7 +456,7 @@ class ActiveRemoteClientPipelineFactory( val protobufEnc = new ProtobufEncoder val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match { case "zlib" ⇒ (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) - case _ ⇒ (Nil, Nil) + case _ ⇒ (Nil, Nil) } val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client) @@ -443,12 +470,12 @@ class ActiveRemoteClientPipelineFactory( */ @ChannelHandler.Sharable class ActiveRemoteClientHandler( - val name: String, - val futures: ConcurrentMap[Uuid, Promise[_]], - val bootstrap: ClientBootstrap, - val remoteAddress: InetSocketAddress, - val timer: HashedWheelTimer, - val client: ActiveRemoteClient) + val name: String, + val futures: ConcurrentMap[Uuid, Promise[_]], + val bootstrap: ClientBootstrap, + val remoteAddress: InetSocketAddress, + val timer: HashedWheelTimer, + val client: ActiveRemoteClient) extends SimpleChannelUpstreamHandler { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { @@ -457,11 +484,16 @@ class ActiveRemoteClientHandler( case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒ val rcp = arp.getInstruction rcp.getCommandType match { - case CommandType.SHUTDOWN ⇒ spawn { client.module.shutdownClientConnection(remoteAddress) } + case CommandType.SHUTDOWN ⇒ spawn { + client.module.shutdownClientConnection(remoteAddress) + } } case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ val reply = arp.getMessage val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) + EventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]".format(reply)) + EventHandler.debug(this, "Trying to map back to future: %s".format(replyUuid)) + futures.remove(replyUuid).asInstanceOf[Promise[Any]] match { case null => client.notifyListeners(RemoteClientError(new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist"), client.module, client.remoteAddress)) @@ -495,13 +527,16 @@ class ActiveRemoteClientHandler( } } }, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) - } else spawn { client.module.shutdownClientConnection(remoteAddress) } + } else spawn { + client.module.shutdownClientConnection(remoteAddress) + } } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { try { if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) + EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress)) client.resetReconnectionTimeWindow } catch { case e: Throwable ⇒ @@ -513,12 +548,20 @@ class ActiveRemoteClientHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress)) + EventHandler.debug(this, "Remote client disconnected from [%s]".format(ctx.getChannel.getRemoteAddress)) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + if (event.getCause ne null) + EventHandler.error(event.getCause, "Unexpected exception from downstream in remote client", this) + else + EventHandler.error(this, "Unexpected exception from downstream in remote client: %s".format(event)) + event.getCause match { case e: ReadTimeoutException ⇒ - spawn { client.module.shutdownClientConnection(remoteAddress) } + spawn { + client.module.shutdownClientConnection(remoteAddress) + } case e ⇒ client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) event.getChannel.close //FIXME Is this the correct behavior? @@ -553,17 +596,18 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with def optimizeLocalScoped_?() = optimizeLocal.get protected[akka] def actorFor( - actorAddress: String, - timeout: Long, - host: String, - port: Int, - loader: Option[ClassLoader]): ActorRef = { + actorAddress: String, + timeout: Long, + host: String, + port: Int, + loader: Option[ClassLoader]): ActorRef = { val homeInetSocketAddress = this.address if (optimizeLocalScoped_?) { if ((host == homeInetSocketAddress.getAddress.getHostAddress || host == homeInetSocketAddress.getHostName) && - port == homeInetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals? + port == homeInetSocketAddress.getPort) { + //TODO: switch to InetSocketAddress.equals? val localRef = findActorByAddressOrUuid(actorAddress, actorAddress) if (localRef ne null) return localRef //Code significantly simpler with the return statement } @@ -578,7 +622,9 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with } class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { + import RemoteServerSettings._ + val name = "NettyRemoteServer@" + host + ":" + port val address = new InetSocketAddress(host, port) @@ -629,14 +675,14 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, } } -trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule ⇒ - import RemoteServerSettings._ +trait NettyRemoteServerModule extends RemoteServerModule { + self: RemoteModule ⇒ private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) def address = currentServer.get match { case Some(server) ⇒ server.address - case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress + case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress } def name = currentServer.get match { @@ -653,6 +699,8 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard { try { _isRunning switchOn { + EventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port)) + currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader))) } } catch { @@ -665,8 +713,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule def shutdownServerModule() = guard withGuard { _isRunning switchOff { - currentServer.getAndSet(None) foreach { instance ⇒ - instance.shutdown() + currentServer.getAndSet(None) foreach { + instance ⇒ + EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port)) + + instance.shutdown() } } } @@ -710,7 +761,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule * Unregister RemoteModule Actor that is registered using its 'id' field (not custom ID). */ def unregister(actorRef: ActorRef): Unit = guard withGuard { + if (_isRunning.isOn) { + EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid)) + actors.remove(actorRef.address, actorRef) actorsByUuid.remove(actorRef.uuid.toString, actorRef) } @@ -722,7 +776,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule * NOTE: You need to call this method if you have registered an actor by a custom ID. */ def unregister(id: String): Unit = guard withGuard { + if (_isRunning.isOn) { + EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id)) + if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length)) else { val actorRef = actors get id @@ -738,7 +795,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule * NOTE: You need to call this method if you have registered an actor by a custom ID. */ def unregisterPerSession(id: String): Unit = { + if (_isRunning.isOn) { + EventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id)) + actorsFactories.remove(id) } } @@ -748,11 +808,12 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule * @author Jonas Bonér */ class RemoteServerPipelineFactory( - val name: String, - val openChannels: ChannelGroup, - val executor: ExecutionHandler, - val loader: Option[ClassLoader], - val server: NettyRemoteServerModule) extends ChannelPipelineFactory { + val name: String, + val openChannels: ChannelGroup, + val executor: ExecutionHandler, + val loader: Option[ClassLoader], + val server: NettyRemoteServerModule) extends ChannelPipelineFactory { + import RemoteServerSettings._ def getPipeline: ChannelPipeline = { @@ -762,7 +823,7 @@ class RemoteServerPipelineFactory( val protobufEnc = new ProtobufEncoder val (enc, dec) = COMPRESSION_SCHEME match { case "zlib" ⇒ (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) - case _ ⇒ (Nil, Nil) + case _ ⇒ (Nil, Nil) } val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) @@ -802,10 +863,11 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si */ @ChannelHandler.Sharable class RemoteServerHandler( - val name: String, - val openChannels: ChannelGroup, - val applicationLoader: Option[ClassLoader], - val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { + val name: String, + val openChannels: ChannelGroup, + val applicationLoader: Option[ClassLoader], + val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { + import RemoteServerSettings._ // applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY @@ -822,7 +884,7 @@ class RemoteServerHandler( } else if (!future.isSuccess) { val socketAddress = future.getChannel.getRemoteAddress match { case i: InetSocketAddress ⇒ Some(i) - case _ ⇒ None + case _ ⇒ None } server.notifyListeners(RemoteServerWriteFailed(payload, future.getCause, server, socketAddress)) } @@ -838,6 +900,8 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) + EventHandler.debug(this,"Remote client [%s] connected to [%s]".format(clientAddress, server.name)) + sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) } @@ -845,12 +909,18 @@ class RemoteServerHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) + EventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name)) + // stop all session actors for ( map ← Option(sessionActors.remove(event.getChannel)); actor ← collectionAsScalaIterable(map.values) ) { - try { actor ! PoisonPill } catch { case e: Exception ⇒ } + try { + actor ! PoisonPill + } catch { + case e: Exception ⇒ EventHandler.error(e, "Couldn't stop %s".format(actor),this) + } } server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) @@ -858,6 +928,8 @@ class RemoteServerHandler( override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) + EventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name),this) + server.notifyListeners(RemoteServerClientClosed(server, clientAddress)) } @@ -873,6 +945,8 @@ class RemoteServerHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + EventHandler.error(event.getCause, "Unexpected exception from remote downstream", this) + event.getChannel.close server.notifyListeners(RemoteServerError(event.getCause, server)) } @@ -880,7 +954,7 @@ class RemoteServerHandler( private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] = ctx.getChannel.getRemoteAddress match { case inet: InetSocketAddress ⇒ Some(inet) - case _ ⇒ None + case _ ⇒ None } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try { @@ -894,8 +968,13 @@ class RemoteServerHandler( private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) { val actorInfo = request.getActorInfo + + EventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid)) + val actorRef = - try { createActor(actorInfo, channel) } catch { + try { + createActor(actorInfo, channel) + } catch { case e: SecurityException ⇒ EventHandler.error(e, this, e.getMessage) write(channel, createErrorReplyMessage(e, request)) @@ -908,7 +987,8 @@ class RemoteServerHandler( if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) else None - message match { // first match on system messages + message match { + // first match on system messages case RemoteActorSystemMessage.Stop ⇒ if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else actorRef.stop() @@ -923,22 +1003,22 @@ class RemoteServerHandler( request.getActorInfo.getTimeout, new ActorPromise(request.getActorInfo.getTimeout). onComplete(_.value.get match { - case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request)) - case r: Right[_,_] ⇒ - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( - Some(actorRef), - Right(request.getUuid), - actorInfo.getAddress, - actorInfo.getTimeout, - r.asInstanceOf[Either[Throwable,Any]], - isOneWay = true, - Some(actorRef)) + case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request)) + case r: Right[_, _] ⇒ + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + Some(actorRef), + Right(request.getUuid), + actorInfo.getAddress, + actorInfo.getTimeout, + r.asInstanceOf[Either[Throwable, Any]], + isOneWay = true, + Some(actorRef)) - // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method - if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method + if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) - write(channel, RemoteEncoder.encode(messageBuilder.build)) - })) + write(channel, RemoteEncoder.encode(messageBuilder.build)) + })) } } @@ -988,7 +1068,7 @@ class RemoteServerHandler( private def findSessionActor(id: String, channel: Channel): ActorRef = sessionActors.get(channel) match { case null ⇒ null - case map ⇒ map get id + case map ⇒ map get id } private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = { diff --git a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala index d8b1293bc6..810f813efb 100644 --- a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -21,6 +21,7 @@ import java.net.InetSocketAddress import com.google.protobuf.ByteString import com.eaio.uuid.UUID +import akka.event.EventHandler /** * Module for local actor serialization. @@ -142,6 +143,8 @@ object ActorSerialization { overriddenUuid: Option[UUID], loader: Option[ClassLoader]): ActorRef = { + EventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol)) + val lifeCycle = if (protocol.hasLifeCycle) { protocol.getLifeCycle.getLifeCycle match { @@ -243,11 +246,17 @@ object RemoteActorSerialization { * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - RemoteActorRef( + EventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) + + val ref = RemoteActorRef( JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress], protocol.getAddress, protocol.getTimeout, loader) + + EventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid)) + + ref } /** @@ -263,6 +272,9 @@ object RemoteActorSerialization { case _ ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress } + + EventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) + RemoteActorRefProtocol.newBuilder .setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress))) .setAddress(actor.address) diff --git a/akka-docs/scala/fault-tolerance.rst b/akka-docs/scala/fault-tolerance.rst index b610bff96f..507c3a3b88 100644 --- a/akka-docs/scala/fault-tolerance.rst +++ b/akka-docs/scala/fault-tolerance.rst @@ -228,11 +228,11 @@ A child actor can tell the supervising actor to unlink him by sending him the 'U .. code-block:: scala - if (supervisor.isDefined) supervisor.get ! Unlink(this) + if (supervisor.isDefined) supervisor.get ! Unlink(self) // Or shorter using 'foreach': - supervisor.foreach(_ ! Unlink(this)) + supervisor.foreach(_ ! Unlink(self)) The supervising actor's side of things ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^