Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-07-22 14:45:49 +02:00
commit ae35e61a02
16 changed files with 541 additions and 358 deletions

View file

@ -29,12 +29,14 @@ trait BenchmarkScenarios extends PerformanceTest {
def complexScenario80 = complexScenario(80)
@Test
def complexScenario100 = complexScenario(100)
/*
@Test
def complexScenario200 = complexScenario(200)
@Test
def complexScenario300 = complexScenario(300)
@Test
def complexScenario400 = complexScenario(400)
*/
def complexScenario(numberOfClients: Int) {
Assume.assumeTrue(numberOfClients >= minClients)

View file

@ -7,11 +7,13 @@ import akka.event.EventHandler
trait OrderReceiver {
type ME
val matchingEngines: List[ME]
var matchingEnginePartitionsIsStale = true
var matchingEngineForOrderbook: Map[String, ME] = Map()
def refreshMatchingEnginePartitions() {
def refreshMatchingEnginePartitions(routing: MatchingEngineRouting[ME]) {
val matchingEngines: List[ME] = routing.mapping.keys.toList
def supportedOrderbooks(me: ME): List[String] = routing.mapping(me)
val m = Map() ++
(for {
me matchingEngines
@ -19,14 +21,11 @@ trait OrderReceiver {
} yield (orderbookSymbol, me))
matchingEngineForOrderbook = m
matchingEnginePartitionsIsStale = false
}
def supportedOrderbooks(me: ME): List[String]
}
class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp: Option[MessageDispatcher])
class AkkaOrderReceiver(disp: Option[MessageDispatcher])
extends Actor with OrderReceiver {
type ME = ActorRef
@ -34,21 +33,13 @@ class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp
self.dispatcher = d
}
override val matchingEngines: List[ActorRef] = matchingEngineRouting.keys.toList
override def preStart() {
refreshMatchingEnginePartitions()
}
def receive = {
case routing@MatchingEngineRouting(mapping)
refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]])
case order: Order placeOrder(order)
case unknown EventHandler.warning(this, "Received unknown message: " + unknown)
}
override def supportedOrderbooks(me: ActorRef): List[String] = {
matchingEngineRouting(me)
}
def placeOrder(order: Order) = {
val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol)
matchingEngine match {
@ -60,3 +51,5 @@ class AkkaOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp
}
}
}
case class MatchingEngineRouting[ME](mapping: Map[ME, List[String]])

View file

@ -1,7 +1,5 @@
package akka.performance.trading.common
import java.text.SimpleDateFormat
import java.util.Date
import java.util.Random
import scala.collection.immutable.TreeMap
@ -12,11 +10,13 @@ import org.junit.After
import org.junit.Before
import org.scalatest.junit.JUnitSuite
import akka.event.EventHandler
import akka.performance.trading.domain.Ask
import akka.performance.trading.domain.Bid
import akka.performance.trading.domain.Order
import akka.performance.trading.domain.TotalTradeCounter
import akka.performance.workbench.BenchResultRepository
import akka.performance.workbench.Report
import akka.performance.workbench.Stats
trait PerformanceTest extends JUnitSuite {

View file

@ -1,179 +0,0 @@
package akka.performance.trading.common
import java.io.File
import java.text.SimpleDateFormat
import java.io.PrintWriter
import java.io.FileWriter
import akka.event.EventHandler
import java.util.Date
class Report(
resultRepository: BenchResultRepository,
compareResultWith: Option[String] = None) {
private val dir = System.getProperty("benchmark.resultDir", "target/benchmark")
private def dirExists: Boolean = new File(dir).exists
private def log = System.getProperty("benchmark.logResult", "false").toBoolean
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss")
def html(statistics: Seq[Stats]): Unit = if (dirExists) {
val current = statistics.last
val sb = new StringBuilder
val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp))
sb.append(header(title))
sb.append("<h1>%s</h1>\n".format(title))
sb.append("<pre>\n")
sb.append(formatResultsTable(statistics))
sb.append("\n</pre>\n")
sb.append(img(percentilesChart(current)))
sb.append(img(latencyAndThroughputChart(current)))
for (stats statistics) {
compareWithHistoricalPercentiliesChart(stats).foreach(url sb.append(img(url)))
}
for (stats statistics) {
comparePercentilesChart(stats).foreach(url sb.append(img(url)))
}
if (dirExists) {
val timestamp = fileTimestampFormat.format(new Date(current.timestamp))
val name = current.name + "--" + timestamp + ".html"
write(sb.toString, name)
}
}
private def img(url: String): String = {
"""<img src="%s" border="0" width="%s" height="%s" />""".format(
url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n"
}
def percentilesChart(stats: Stats): String = {
val chartTitle = stats.name + " Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients")
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
def comparePercentilesChart(stats: Stats): Seq[String] = {
for {
compareName compareResultWith.toSeq
compareStats resultRepository.get(compareName, stats.load)
} yield {
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(Seq(compareStats, stats), chartTitle, _.name)
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
}
def compareWithHistoricalPercentiliesChart(stats: Stats): Option[String] = {
val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load)
if (withHistorical.size > 1) {
val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles (microseconds)"
val chartUrl = GoogleChartBuilder.percentilChartUrl(withHistorical, chartTitle,
stats legendTimeFormat.format(new Date(stats.timestamp)))
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
Some(chartUrl)
} else {
None
}
}
def latencyAndThroughputChart(stats: Stats): String = {
val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)"
val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle)
if (log) EventHandler.info(this, chartTitle + " Chart:\n" + chartUrl)
chartUrl
}
def formatResultsTable(statsSeq: Seq[Stats]): String = {
val name = statsSeq.head.name
val spaces = " "
val headerScenarioCol = ("Scenario" + spaces).take(name.length)
val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil)
.mkString("\t")
val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil)
.mkString("\t")
val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString
val formattedStats = "\n" +
line.replace('-', '=') + "\n" +
headerLine + "\n" +
headerLine2 + "\n" +
line + "\n" +
statsSeq.map(formatStats(_)).mkString("\n") + "\n" +
line + "\n"
if (log) EventHandler.info(this, formattedStats)
formattedStats
}
def formatStats(stats: Stats): String = {
val durationS = stats.durationNanos.toDouble / 1000000000.0
val duration = durationS.formatted("%.0f")
val tpsStr = stats.tps.formatted("%.0f")
val meanStr = stats.mean.formatted("%.0f")
val summaryLine =
stats.name ::
stats.load.toString ::
tpsStr ::
meanStr ::
stats.percentiles(5).toString ::
stats.percentiles(25).toString ::
stats.percentiles(50).toString ::
stats.percentiles(75).toString ::
stats.percentiles(95).toString ::
duration ::
stats.n.toString ::
Nil
summaryLine.mkString("\t")
}
def write(content: String, fileName: String) {
val f = new File(dir, fileName)
var writer: PrintWriter = null
try {
writer = new PrintWriter(new FileWriter(f))
writer.print(content)
writer.flush()
} catch {
case e: Exception
EventHandler.error(this, "Failed to save report to [%s], due to [%s]".
format(f.getAbsolutePath, e.getMessage))
} finally {
if (writer ne null) try { writer.close() } catch { case ignore: Exception }
}
}
def header(title: String) =
"""|<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
|<html>
|<head>
|
|<title>%s</title>
|</head>
|<body>
|""".stripMargin.format(title)
def footer =
"""|</body>"
|</html>""".stripMargin
}

View file

@ -75,7 +75,7 @@ class AkkaTradingSystem extends TradingSystem {
(1 to 10).toList map (i createOrderReceiver())
}
def matchingEngineRouting: Map[ActorRef, List[String]] = {
def matchingEngineRouting: MatchingEngineRouting[ActorRef] = {
val rules =
for {
info matchingEngines
@ -84,11 +84,11 @@ class AkkaTradingSystem extends TradingSystem {
(info.primary, orderbookSymbols)
}
Map() ++ rules
MatchingEngineRouting(Map() ++ rules)
}
def createOrderReceiver() =
actorOf(new AkkaOrderReceiver(matchingEngineRouting, orDispatcher))
actorOf(new AkkaOrderReceiver(orDispatcher))
override def start() {
for (MatchingEngineInfo(p, s, o) matchingEngines) {
@ -97,7 +97,11 @@ class AkkaTradingSystem extends TradingSystem {
s.foreach(_.start())
s.foreach(p ! _)
}
orderReceivers.foreach(_.start())
val routing = matchingEngineRouting
for (or orderReceivers) {
or.start()
or ! routing
}
}
override def shutdown() {

View file

@ -6,8 +6,8 @@ import akka.event.EventHandler
import akka.performance.trading.domain._
import akka.performance.trading.common.AkkaOrderReceiver
class OneWayOrderReceiver(matchingEngineRouting: Map[ActorRef, List[String]], disp: Option[MessageDispatcher])
extends AkkaOrderReceiver(matchingEngineRouting, disp) {
class OneWayOrderReceiver(disp: Option[MessageDispatcher])
extends AkkaOrderReceiver(disp) {
override def placeOrder(order: Order) = {
val matchingEngine = matchingEngineForOrderbook.get(order.orderbookSymbol)

View file

@ -11,6 +11,6 @@ class OneWayTradingSystem extends AkkaTradingSystem {
actorOf(new OneWayMatchingEngine(meId, orderbooks, meDispatcher))
override def createOrderReceiver() =
actorOf(new OneWayOrderReceiver(matchingEngineRouting, orDispatcher))
actorOf(new OneWayOrderReceiver(orDispatcher))
}

View file

@ -1,16 +1,18 @@
package akka.performance.trading.common
package akka.performance.workbench
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.FileWriter
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.io.PrintWriter
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.{ Map MutableMap }
import scala.collection.mutable.{Map => MutableMap}
import akka.event.EventHandler
@ -23,6 +25,10 @@ trait BenchResultRepository {
def getWithHistorical(name: String, load: Int): Seq[Stats]
def saveHtmlReport(content: String, name: String): Unit
def htmlReportUrl(name: String): String
}
object BenchResultRepository {
@ -34,8 +40,10 @@ class FileBenchResultRepository extends BenchResultRepository {
private val statsByName = MutableMap[String, Seq[Stats]]()
private val baselineStats = MutableMap[Key, Stats]()
private val historicalStats = MutableMap[Key, Seq[Stats]]()
private val dir = System.getProperty("benchmark.resultDir", "target/benchmark")
private def dirExists: Boolean = new File(dir).exists
private val serDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/ser"
private def serDirExists: Boolean = new File(serDir).exists
private val htmlDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/html"
private def htmlDirExists: Boolean = new File(htmlDir).exists
protected val maxHistorical = 7
case class Key(name: String, load: Int)
@ -64,10 +72,10 @@ class FileBenchResultRepository extends BenchResultRepository {
}
private def loadFiles() {
if (dirExists) {
if (serDirExists) {
val files =
for {
f new File(dir).listFiles
f new File(serDir).listFiles
if f.isFile
if f.getName.endsWith(".ser")
} yield f
@ -86,11 +94,11 @@ class FileBenchResultRepository extends BenchResultRepository {
}
private def save(stats: Stats) {
new File(dir).mkdirs
if (!dirExists) return
new File(serDir).mkdirs
if (!serDirExists) return
val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp))
val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser"
val f = new File(dir, name)
val f = new File(serDir, name)
var out: ObjectOutputStream = null
try {
out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f)))
@ -127,5 +135,24 @@ class FileBenchResultRepository extends BenchResultRepository {
loadFiles()
def saveHtmlReport(content: String, fileName: String) {
new File(htmlDir).mkdirs
if (!htmlDirExists) return
val f = new File(htmlDir, fileName)
var writer: PrintWriter = null
try {
writer = new PrintWriter(new FileWriter(f))
writer.print(content)
writer.flush()
} catch {
case e: Exception
EventHandler.error(this, "Failed to save report to [%s], due to [%s]".
format(f.getAbsolutePath, e.getMessage))
} finally {
if (writer ne null) try { writer.close() } catch { case ignore: Exception }
}
}
def htmlReportUrl(fileName: String): String = new File(htmlDir, fileName).getAbsolutePath
}

View file

@ -1,10 +1,9 @@
package akka.performance.trading.common
package akka.performance.workbench
import java.io.UnsupportedEncodingException
import java.net.URLEncoder
import scala.collection.immutable.TreeMap
import java.util.Locale
import java.util.Formatter
/**
* Generates URLs to Google Chart API http://code.google.com/apis/chart/
@ -15,9 +14,9 @@ object GoogleChartBuilder {
val ChartHeight = 400
/**
* Builds a bar chart for all percentiles in the statistics.
* Builds a bar chart for all percentiles and the mean in the statistics.
*/
def percentilChartUrl(statistics: Seq[Stats], title: String, legend: Stats String): String = {
def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: Stats String): String = {
if (statistics.isEmpty) return ""
val current = statistics.last
@ -38,6 +37,7 @@ object GoogleChartBuilder {
sb.append("&")
// labels
percentileLabels(current.percentiles, sb)
sb.append("|mean")
sb.append("|2:|min|mean|median")
sb.append("&")
// label positions
@ -63,7 +63,7 @@ object GoogleChartBuilder {
// data series
val maxValue = statistics.map(_.percentiles.last._2).max
sb.append("chd=t:")
dataSeries(statistics.map(_.percentiles), sb)
dataSeries(statistics.map(_.percentiles), statistics.map(_.mean), sb)
// y range
sb.append("&")
@ -98,13 +98,18 @@ object GoogleChartBuilder {
sb.append(s)
}
private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], sb: StringBuilder) {
val series =
private def dataSeries(allPercentiles: Seq[TreeMap[Int, Long]], meanValues: Seq[Double], sb: StringBuilder) {
val percentileSeries =
for {
percentiles allPercentiles
} yield {
percentiles.values.mkString(",")
}
val series =
for ((s, m) percentileSeries.zip(meanValues))
yield s + "," + formatDouble(m)
sb.append(series.mkString("|"))
}
@ -144,11 +149,11 @@ object GoogleChartBuilder {
sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767")
sb.append("&")
sb.append("chco=")
val seriesColors = List("25B33B", "3072F3", "FF0000", "FF9900")
val seriesColors = List("25B33B", "3072F3", "FF0000", "37F0ED", "FF9900")
sb.append(seriesColors.mkString(","))
sb.append("&")
// legend
sb.append("chdl=5th Percentile|Median|95th Percentile|Throughput")
sb.append("chdl=5th%20Percentile|Median|95th%20Percentile|Mean|Throughput")
sb.append("&")
sb.append("chdlp=b")
@ -160,6 +165,7 @@ object GoogleChartBuilder {
sb.append("chls=1|1|1")
sb.append("&")
// margins
sb.append("chma=5,5,5,25")
sb.append("&")
@ -181,6 +187,11 @@ object GoogleChartBuilder {
}
sb.append(percentileSeries.mkString("|"))
sb.append("|")
sb.append(loadStr).append("|")
val meanSeries = statistics.map(s formatDouble(s.mean)).mkString(",")
sb.append(meanSeries)
sb.append("|")
val maxTps: Double = statistics.map(_.tps).max
sb.append(loadStr).append("|")
@ -192,7 +203,7 @@ object GoogleChartBuilder {
// y range
sb.append("&")
sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append("|1,0,").append(maxValue).append("|2,0,")
sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append(",4").append("|1,0,").append(maxValue).append("|2,0,")
.append(formatDouble(maxTps))
sb.append("&")
@ -203,6 +214,9 @@ object GoogleChartBuilder {
sb.append(",")
}
sb.append(minLoad).append(",").append(maxLoad)
sb.append(",0,").append(formatDouble(maxValue))
sb.append(",")
sb.append(minLoad).append(",").append(maxLoad)
sb.append(",0,").append(formatDouble(maxTps))
sb.append("&")

View file

@ -0,0 +1,220 @@
package akka.performance.workbench
import java.lang.management.ManagementFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.JavaConversions.enumerationAsScalaIterator
import akka.event.EventHandler
import akka.config.Config
import akka.config.Config.config
class Report(
resultRepository: BenchResultRepository,
compareResultWith: Option[String] = None) {
private def log = System.getProperty("benchmark.logResult", "true").toBoolean
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val fileTimestampFormat = new SimpleDateFormat("yyyyMMddHHmmss")
def html(statistics: Seq[Stats]): Unit = {
val current = statistics.last
val sb = new StringBuilder
val title = current.name + " " + dateTimeFormat.format(new Date(current.timestamp))
sb.append(header(title))
sb.append("<h1>%s</h1>\n".format(title))
sb.append("<pre>\n")
val resultTable = formatResultsTable(statistics)
sb.append(resultTable)
sb.append("\n</pre>\n")
sb.append(img(percentilesAndMeanChart(current)))
sb.append(img(latencyAndThroughputChart(current)))
for (stats statistics) {
compareWithHistoricalPercentiliesAndMeanChart(stats).foreach(url sb.append(img(url)))
}
for (stats statistics) {
comparePercentilesAndMeanChart(stats).foreach(url sb.append(img(url)))
}
sb.append("<hr/>\n")
sb.append("<pre>\n")
sb.append(systemInformation)
sb.append("\n</pre>\n")
val timestamp = fileTimestampFormat.format(new Date(current.timestamp))
val reportName = current.name + "--" + timestamp + ".html"
resultRepository.saveHtmlReport(sb.toString, reportName)
if (log) {
EventHandler.info(this, resultTable + "Charts in html report: " + resultRepository.htmlReportUrl(reportName))
}
}
def img(url: String): String = {
"""<img src="%s" border="0" width="%s" height="%s" />""".format(
url, GoogleChartBuilder.ChartWidth, GoogleChartBuilder.ChartHeight) + "\n"
}
def percentilesAndMeanChart(stats: Stats): String = {
val chartTitle = stats.name + " Percentiles and Mean (microseconds)"
val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(resultRepository.get(stats.name), chartTitle, _.load + " clients")
chartUrl
}
def comparePercentilesAndMeanChart(stats: Stats): Seq[String] = {
for {
compareName compareResultWith.toSeq
compareStats resultRepository.get(compareName, stats.load)
} yield {
val chartTitle = stats.name + " vs. " + compareName + ", " + stats.load + " clients" + ", Percentiles and Mean (microseconds)"
val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(Seq(compareStats, stats), chartTitle, _.name)
chartUrl
}
}
def compareWithHistoricalPercentiliesAndMeanChart(stats: Stats): Option[String] = {
val withHistorical = resultRepository.getWithHistorical(stats.name, stats.load)
if (withHistorical.size > 1) {
val chartTitle = stats.name + " vs. historical, " + stats.load + " clients" + ", Percentiles and Mean (microseconds)"
val chartUrl = GoogleChartBuilder.percentilesAndMeanChartUrl(withHistorical, chartTitle,
stats legendTimeFormat.format(new Date(stats.timestamp)))
Some(chartUrl)
} else {
None
}
}
def latencyAndThroughputChart(stats: Stats): String = {
val chartTitle = stats.name + " Latency (microseconds) and Throughput (TPS)"
val chartUrl = GoogleChartBuilder.latencyAndThroughputChartUrl(resultRepository.get(stats.name), chartTitle)
chartUrl
}
def formatResultsTable(statsSeq: Seq[Stats]): String = {
val name = statsSeq.head.name
val spaces = " "
val headerScenarioCol = ("Scenario" + spaces).take(name.length)
val headerLine = (headerScenarioCol :: "clients" :: "TPS" :: "mean" :: "5% " :: "25% " :: "50% " :: "75% " :: "95% " :: "Durat." :: "N" :: Nil)
.mkString("\t")
val headerLine2 = (spaces.take(name.length) :: " " :: " " :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(us)" :: "(s) " :: " " :: Nil)
.mkString("\t")
val line = List.fill(formatStats(statsSeq.head).replaceAll("\t", " ").length)("-").mkString
val formattedStats = "\n" +
line.replace('-', '=') + "\n" +
headerLine + "\n" +
headerLine2 + "\n" +
line + "\n" +
statsSeq.map(formatStats(_)).mkString("\n") + "\n" +
line + "\n"
formattedStats
}
def formatStats(stats: Stats): String = {
val durationS = stats.durationNanos.toDouble / 1000000000.0
val duration = durationS.formatted("%.0f")
val tpsStr = stats.tps.formatted("%.0f")
val meanStr = stats.mean.formatted("%.0f")
val summaryLine =
stats.name ::
stats.load.toString ::
tpsStr ::
meanStr ::
stats.percentiles(5).toString ::
stats.percentiles(25).toString ::
stats.percentiles(50).toString ::
stats.percentiles(75).toString ::
stats.percentiles(95).toString ::
duration ::
stats.n.toString ::
Nil
summaryLine.mkString("\t")
}
def systemInformation: String = {
val runtime = ManagementFactory.getRuntimeMXBean
val os = ManagementFactory.getOperatingSystemMXBean
val threads = ManagementFactory.getThreadMXBean
val mem = ManagementFactory.getMemoryMXBean
val heap = mem.getHeapMemoryUsage
val sb = new StringBuilder
sb.append("Benchmark properties:")
import scala.collection.JavaConversions._
val propNames: Seq[String] = System.getProperties.propertyNames.toSeq.map(_.toString)
for (name propNames if name.startsWith("benchmark")) {
sb.append("\n ").append(name).append("=").append(System.getProperty(name))
}
sb.append("\n")
sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion)
sb.append("\n")
sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor).
append(" ").append(runtime.getVmVersion)
sb.append("\n")
sb.append("Processors: ").append(os.getAvailableProcessors)
sb.append("\n")
sb.append("Load average: ").append(os.getSystemLoadAverage)
sb.append("\n")
sb.append("Thread count: ").append(threads.getThreadCount).append(" (").append(threads.getPeakThreadCount).append(")")
sb.append("\n")
sb.append("Heap: ").append(formatDouble(heap.getUsed.toDouble / 1024 / 1024)).
append(" (").append(formatDouble(heap.getInit.toDouble / 1024 / 1024)).
append(" - ").
append(formatDouble(heap.getMax.toDouble / 1024 / 1024)).
append(")").append(" MB")
sb.append("\n")
val args = runtime.getInputArguments.filterNot(_.contains("classpath")).mkString("\n ")
sb.append("Args:\n ").append(args)
sb.append("\n")
sb.append("Akka version: ").append(Config.CONFIG_VERSION)
sb.append("\n")
sb.append("Akka config:")
for (key config.keys) {
sb.append("\n ").append(key).append("=").append(config(key))
}
sb.toString
}
def formatDouble(value: Double): String = {
new java.math.BigDecimal(value).setScale(2, java.math.RoundingMode.HALF_EVEN).toString
}
def header(title: String) =
"""|<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
|<html>
|<head>
|
|<title>%s</title>
|</head>
|<body>
|""".stripMargin.format(title)
def footer =
"""|</body>"
|</html>""".stripMargin
}

View file

@ -1,7 +1,8 @@
package akka.performance.trading.common
package akka.performance.workbench
import scala.collection.immutable.TreeMap
@SerialVersionUID(1L)
case class Stats(
name: String,
load: Int,

View file

@ -8,7 +8,7 @@ import akka.actor._
import akka.dispatch.Dispatchers
import akka.config.Config._
import akka.config.ConfigurationException
import akka.util.{ ListenerManagement, ReflectiveAccess }
import akka.util.{ListenerManagement, ReflectiveAccess}
import akka.serialization._
import akka.AkkaException
@ -26,7 +26,7 @@ import akka.AkkaException
* case EventHandler.Info(instance, message) ...
* case EventHandler.Debug(instance, message) ...
* case genericEvent ...
* }
* }
* })
*
* EventHandler.addListener(eventHandlerListener)
@ -95,10 +95,10 @@ object EventHandler extends ListenerManagement {
@volatile
var level: Int = config.getString("akka.event-handler-level", "INFO") match {
case "ERROR" ErrorLevel
case "ERROR" ErrorLevel
case "WARNING" WarningLevel
case "INFO" InfoLevel
case "DEBUG" DebugLevel
case "INFO" InfoLevel
case "DEBUG" DebugLevel
case unknown throw new ConfigurationException(
"Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]")
}
@ -106,21 +106,22 @@ object EventHandler extends ListenerManagement {
def start() {
try {
val defaultListeners = config.getList("akka.event-handlers") match {
case Nil "akka.event.EventHandler$DefaultListener" :: Nil
case Nil "akka.event.EventHandler$DefaultListener" :: Nil
case listeners listeners
}
defaultListeners foreach { listenerName
try {
ReflectiveAccess.getClassFor[Actor](listenerName) match {
case Right(actorClass) addListener(Actor.localActorOf(actorClass).start())
case Left(exception) throw exception
defaultListeners foreach {
listenerName
try {
ReflectiveAccess.getClassFor[Actor](listenerName) match {
case Right(actorClass) addListener(Actor.localActorOf(actorClass).start())
case Left(exception) throw exception
}
} catch {
case e: Exception
throw new ConfigurationException(
"Event Handler specified in config can't be loaded [" + listenerName +
"] due to [" + e.toString + "]", e)
}
} catch {
case e: Exception
throw new ConfigurationException(
"Event Handler specified in config can't be loaded [" + listenerName +
"] due to [" + e.toString + "]", e)
}
}
info(this, "Starting up EventHandler")
} catch {
@ -145,7 +146,7 @@ object EventHandler extends ListenerManagement {
notifyListeners(event)
}
def notify[T <: Event: ClassManifest](event: T) {
def notify[T <: Event : ClassManifest](event: T) {
if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) notifyListeners(event)
}
@ -181,6 +182,7 @@ object EventHandler extends ListenerManagement {
if (level >= InfoLevel) notifyListeners(Info(instance, message))
}
def debug(instance: AnyRef, message: String) {
if (level >= DebugLevel) notifyListeners(Debug(instance, message))
}
@ -194,7 +196,7 @@ object EventHandler extends ListenerManagement {
def isDebugEnabled = level >= DebugLevel
def stackTraceFor(e: Throwable) = {
import java.io.{ StringWriter, PrintWriter }
import java.io.{StringWriter, PrintWriter}
val sw = new StringWriter
val pw = new PrintWriter(sw)
e.printStackTrace(pw)
@ -210,6 +212,7 @@ object EventHandler extends ListenerManagement {
}
class DefaultListener extends Actor {
import java.text.SimpleDateFormat
import java.util.Date

View file

@ -6,6 +6,7 @@ package akka.remote
import akka.actor.{ Actor, BootableActorLoaderService }
import akka.util.{ ReflectiveAccess, Bootable }
import akka.event.EventHandler
/**
* This bundle/service is responsible for booting up and shutting down the remote actors facility.
@ -23,14 +24,19 @@ trait BootableRemoteActorService extends Bootable {
abstract override def onLoad() {
if (ReflectiveAccess.ClusterModule.isEnabled && RemoteServerSettings.isRemotingEnabled) {
EventHandler.info(this, "Initializing Remote Actors Service...")
startRemoteService()
EventHandler.info(this, "Remote Actors Service initialized")
}
super.onLoad()
}
abstract override def onUnload() {
EventHandler.info(this, "Shutting down Remote Actors Service")
Actor.remote.shutdown()
if (remoteServerThread.isAlive) remoteServerThread.join(1000)
EventHandler.info(this, "Remote Actors Service has been shut down")
super.onUnload()
}
}

View file

@ -4,24 +4,22 @@
package akka.remote.netty
import akka.dispatch.{ ActorPromise, DefaultPromise, Promise, Future }
import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings }
import akka.dispatch.{ActorPromise, DefaultPromise, Promise}
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
import akka.remote.protocol.RemoteProtocol._
import akka.serialization.RemoteActorSerialization
import akka.serialization.RemoteActorSerialization._
import akka.remoteinterface._
import akka.actor.{
PoisonPill,
LocalActorRef,
Actor,
RemoteActorRef,
ActorRef,
IllegalActorStateException,
RemoteActorSystemMessage,
uuidFrom,
Uuid,
Death,
LifeCycleMessage
PoisonPill,
Actor,
RemoteActorRef,
ActorRef,
IllegalActorStateException,
RemoteActorSystemMessage,
uuidFrom,
Uuid,
LifeCycleMessage
}
import akka.actor.Actor._
import akka.config.Config
@ -30,23 +28,22 @@ import akka.util._
import akka.event.EventHandler
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture }
import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup, ChannelGroupFuture}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.bootstrap.{ServerBootstrap, ClientBootstrap}
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import org.jboss.netty.handler.timeout.{ReadTimeoutHandler, ReadTimeoutException}
import org.jboss.netty.handler.execution.{OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler}
import org.jboss.netty.util.{TimerTask, Timeout, HashedWheelTimer}
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import java.util.concurrent._
import akka.AkkaException
@ -66,7 +63,8 @@ object RemoteEncoder {
}
}
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement
trait NettyRemoteClientModule extends RemoteClientModule {
self: ListenerManagement
private val remoteClients = new HashMap[Address, RemoteClient]
private val remoteActors = new Index[Address, Uuid]
private val lock = new ReadWriteGuard
@ -82,7 +80,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef))
private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient T): T = {
address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient T): T = {
// loader.foreach(MessageSerializer.setClassLoader(_))
val key = Address(address)
lock.readLock.lock
@ -94,7 +92,8 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
lock.writeLock.lock //Lock upgrade, not supported natively
try {
try {
remoteClients.get(key) match { //Recheck for addition, race between upgrades
remoteClients.get(key) match {
//Recheck for addition, race between upgrades
case Some(client) client //If already populated by other writer
case None //Populate map
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
@ -102,24 +101,30 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
remoteClients += key -> client
client
}
} finally { lock.readLock.lock } //downgrade
} finally { lock.writeLock.unlock }
} finally {
lock.readLock.lock
} //downgrade
} finally {
lock.writeLock.unlock
}
}
fun(c)
} finally { lock.readLock.unlock }
} finally {
lock.readLock.unlock
}
}
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
remoteClients.remove(Address(address)) match {
case Some(client) client.shutdown()
case None false
case None false
}
}
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
remoteClients.get(Address(address)) match {
case Some(client) client.connect(reconnectIfAlreadyConnected = true)
case None false
case None false
}
}
@ -133,7 +138,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
}
def shutdownRemoteClients() = lock withWriteGuard {
remoteClients.foreach({ case (addr, client) client.shutdown() })
remoteClients.foreach({
case (addr, client) client.shutdown()
})
remoteClients.clear()
}
}
@ -143,9 +150,9 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
* ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that
* reuses an already established connection.
*/
abstract class RemoteClient private[akka] (
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
abstract class RemoteClient private[akka](
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val useTransactionLog = config.getBool("akka.cluster.client.buffering.retry-message-send-on-failure", true)
val transactionLogCapacity = config.getInt("akka.cluster.client.buffering.capacity", -1)
@ -189,13 +196,13 @@ abstract class RemoteClient private[akka] (
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send[T](
message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef): Option[Promise[T]] =
message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef): Option[Promise[T]] =
send(createRemoteMessageProtocolBuilder(
Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build,
senderFuture)
@ -204,8 +211,8 @@ abstract class RemoteClient private[akka] (
* Sends the message across the wire
*/
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
request: RemoteMessageProtocol,
senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
if (isRunning) {
EventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request))
@ -266,20 +273,23 @@ abstract class RemoteClient private[akka] (
}
}
private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log
private[remote] def sendPendingRequests() = pendingRequests synchronized {
// ensure only one thread at a time can flush the log
val nrOfMessages = pendingRequests.size
if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages)
var pendingRequest = pendingRequests.peek
while (pendingRequest ne null) {
val (isOneWay, futureUuid, message) = pendingRequest
if (isOneWay) { // sendOneWay
if (isOneWay) {
// sendOneWay
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(message, future.getCause, module, remoteAddress))
throw future.getCause
}
} else { // sendRequestReply
} else {
// sendRequestReply
val future = currentChannel.write(RemoteEncoder.encode(message))
future.awaitUninterruptibly()
if (future.isCancelled) futures.remove(futureUuid) // Clean up future
@ -300,9 +310,10 @@ abstract class RemoteClient private[akka] (
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveRemoteClient private[akka] (
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, notifyListenersFun: ( Any) Unit) extends RemoteClient(module, remoteAddress) {
class ActiveRemoteClient private[akka](
module: NettyRemoteClientModule, remoteAddress: InetSocketAddress,
val loader: Option[ClassLoader] = None, notifyListenersFun: ( Any) Unit) extends RemoteClient(module, remoteAddress) {
import RemoteClientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@ -318,6 +329,7 @@ class ActiveRemoteClient private[akka] (
private var reconnectionTimeWindowStart = 0L
def notifyListeners(msg: Any): Unit = notifyListenersFun(msg)
def currentChannel = connection.getChannel
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = {
@ -330,15 +342,18 @@ class ActiveRemoteClient private[akka] (
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
EventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress))
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
EventHandler.error(connection.getCause, "Remote client connection to [%s] has failed".format(remoteAddress), this)
false
} else {
//Send cookie
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SECURE_COOKIE.nonEmpty)
@ -365,12 +380,16 @@ class ActiveRemoteClient private[akka] (
} match {
case true true
case false if reconnectIfAlreadyConnected
EventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress))
openChannels.remove(connection.getChannel)
connection.getChannel.close
connection = bootstrap.connect(remoteAddress)
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
EventHandler.error(connection.getCause, "Reconnection to [%s] has failed".format(remoteAddress),this)
false
} else {
//Send cookie
@ -387,6 +406,8 @@ class ActiveRemoteClient private[akka] (
//Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients
def shutdown() = runSwitch switchOff {
EventHandler.info(this, "Shutting down [%s]".format(name))
notifyListeners(RemoteClientShutdown(module, remoteAddress))
timer.stop()
timer = null
@ -396,6 +417,8 @@ class ActiveRemoteClient private[akka] (
bootstrap = null
connection = null
pendingRequests.clear()
EventHandler.info(this, "[%s] has been shut down".format(name))
}
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
@ -403,7 +426,11 @@ class ActiveRemoteClient private[akka] (
reconnectionTimeWindowStart = System.currentTimeMillis
true
} else {
/*Time left > 0*/ (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
val timeLeft = (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
if (timeLeft) {
EventHandler.info(this, "Will try to reconnect to remote server for another [%s] milliseconds".format(timeLeft))
}
timeLeft
}
}
@ -414,12 +441,12 @@ class ActiveRemoteClient private[akka] (
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveRemoteClientPipelineFactory(
name: String,
futures: ConcurrentMap[Uuid, Promise[_]],
bootstrap: ClientBootstrap,
remoteAddress: InetSocketAddress,
timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory {
name: String,
futures: ConcurrentMap[Uuid, Promise[_]],
bootstrap: ClientBootstrap,
remoteAddress: InetSocketAddress,
timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit)
@ -429,7 +456,7 @@ class ActiveRemoteClientPipelineFactory(
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ (Nil, Nil)
case _ (Nil, Nil)
}
val remoteClient = new ActiveRemoteClientHandler(name, futures, bootstrap, remoteAddress, timer, client)
@ -443,12 +470,12 @@ class ActiveRemoteClientPipelineFactory(
*/
@ChannelHandler.Sharable
class ActiveRemoteClientHandler(
val name: String,
val futures: ConcurrentMap[Uuid, Promise[_]],
val bootstrap: ClientBootstrap,
val remoteAddress: InetSocketAddress,
val timer: HashedWheelTimer,
val client: ActiveRemoteClient)
val name: String,
val futures: ConcurrentMap[Uuid, Promise[_]],
val bootstrap: ClientBootstrap,
val remoteAddress: InetSocketAddress,
val timer: HashedWheelTimer,
val client: ActiveRemoteClient)
extends SimpleChannelUpstreamHandler {
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
@ -457,11 +484,16 @@ class ActiveRemoteClientHandler(
case arp: AkkaRemoteProtocol if arp.hasInstruction
val rcp = arp.getInstruction
rcp.getCommandType match {
case CommandType.SHUTDOWN spawn { client.module.shutdownClientConnection(remoteAddress) }
case CommandType.SHUTDOWN spawn {
client.module.shutdownClientConnection(remoteAddress)
}
}
case arp: AkkaRemoteProtocol if arp.hasMessage
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
EventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]".format(reply))
EventHandler.debug(this, "Trying to map back to future: %s".format(replyUuid))
futures.remove(replyUuid).asInstanceOf[Promise[Any]] match {
case null =>
client.notifyListeners(RemoteClientError(new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist"), client.module, client.remoteAddress))
@ -495,13 +527,16 @@ class ActiveRemoteClientHandler(
}
}
}, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else spawn { client.module.shutdownClientConnection(remoteAddress) }
} else spawn {
client.module.shutdownClientConnection(remoteAddress)
}
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
try {
if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress))
client.resetReconnectionTimeWindow
} catch {
case e: Throwable
@ -513,12 +548,20 @@ class ActiveRemoteClientHandler(
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress))
EventHandler.debug(this, "Remote client disconnected from [%s]".format(ctx.getChannel.getRemoteAddress))
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
if (event.getCause ne null)
EventHandler.error(event.getCause, "Unexpected exception from downstream in remote client", this)
else
EventHandler.error(this, "Unexpected exception from downstream in remote client: %s".format(event))
event.getCause match {
case e: ReadTimeoutException
spawn { client.module.shutdownClientConnection(remoteAddress) }
spawn {
client.module.shutdownClientConnection(remoteAddress)
}
case e
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
event.getChannel.close //FIXME Is this the correct behavior?
@ -553,17 +596,18 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(
actorAddress: String,
timeout: Long,
host: String,
port: Int,
loader: Option[ClassLoader]): ActorRef = {
actorAddress: String,
timeout: Long,
host: String,
port: Int,
loader: Option[ClassLoader]): ActorRef = {
val homeInetSocketAddress = this.address
if (optimizeLocalScoped_?) {
if ((host == homeInetSocketAddress.getAddress.getHostAddress ||
host == homeInetSocketAddress.getHostName) &&
port == homeInetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals?
port == homeInetSocketAddress.getPort) {
//TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(actorAddress, actorAddress)
if (localRef ne null) return localRef //Code significantly simpler with the return statement
}
@ -578,7 +622,9 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
}
class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
import RemoteServerSettings._
val name = "NettyRemoteServer@" + host + ":" + port
val address = new InetSocketAddress(host, port)
@ -629,14 +675,14 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
}
}
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
import RemoteServerSettings._
trait NettyRemoteServerModule extends RemoteServerModule {
self: RemoteModule
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def address = currentServer.get match {
case Some(server) server.address
case None ReflectiveAccess.RemoteModule.configDefaultAddress
case None ReflectiveAccess.RemoteModule.configDefaultAddress
}
def name = currentServer.get match {
@ -653,6 +699,8 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
try {
_isRunning switchOn {
EventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port))
currentServer.set(Some(new NettyRemoteServer(this, _hostname, _port, loader)))
}
} catch {
@ -665,8 +713,11 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
def shutdownServerModule() = guard withGuard {
_isRunning switchOff {
currentServer.getAndSet(None) foreach { instance
instance.shutdown()
currentServer.getAndSet(None) foreach {
instance
EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port))
instance.shutdown()
}
}
}
@ -710,7 +761,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
* Unregister RemoteModule Actor that is registered using its 'id' field (not custom ID).
*/
def unregister(actorRef: ActorRef): Unit = guard withGuard {
if (_isRunning.isOn) {
EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid))
actors.remove(actorRef.address, actorRef)
actorsByUuid.remove(actorRef.uuid.toString, actorRef)
}
@ -722,7 +776,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregister(id: String): Unit = guard withGuard {
if (_isRunning.isOn) {
EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id))
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
else {
val actorRef = actors get id
@ -738,7 +795,10 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterPerSession(id: String): Unit = {
if (_isRunning.isOn) {
EventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id))
actorsFactories.remove(id)
}
}
@ -748,11 +808,12 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory(
val name: String,
val openChannels: ChannelGroup,
val executor: ExecutionHandler,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
val name: String,
val openChannels: ChannelGroup,
val executor: ExecutionHandler,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServerSettings._
def getPipeline: ChannelPipeline = {
@ -762,7 +823,7 @@ class RemoteServerPipelineFactory(
val protobufEnc = new ProtobufEncoder
val (enc, dec) = COMPRESSION_SCHEME match {
case "zlib" (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ (Nil, Nil)
case _ (Nil, Nil)
}
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
@ -802,10 +863,11 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si
*/
@ChannelHandler.Sharable
class RemoteServerHandler(
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
import RemoteServerSettings._
// applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
@ -822,7 +884,7 @@ class RemoteServerHandler(
} else if (!future.isSuccess) {
val socketAddress = future.getChannel.getRemoteAddress match {
case i: InetSocketAddress Some(i)
case _ None
case _ None
}
server.notifyListeners(RemoteServerWriteFailed(payload, future.getCause, server, socketAddress))
}
@ -838,6 +900,8 @@ class RemoteServerHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
EventHandler.debug(this,"Remote client [%s] connected to [%s]".format(clientAddress, server.name))
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
}
@ -845,12 +909,18 @@ class RemoteServerHandler(
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
EventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name))
// stop all session actors
for (
map Option(sessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)
) {
try { actor ! PoisonPill } catch { case e: Exception }
try {
actor ! PoisonPill
} catch {
case e: Exception EventHandler.error(e, "Couldn't stop %s".format(actor),this)
}
}
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
@ -858,6 +928,8 @@ class RemoteServerHandler(
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
EventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name),this)
server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
}
@ -873,6 +945,8 @@ class RemoteServerHandler(
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
EventHandler.error(event.getCause, "Unexpected exception from remote downstream", this)
event.getChannel.close
server.notifyListeners(RemoteServerError(event.getCause, server))
}
@ -880,7 +954,7 @@ class RemoteServerHandler(
private def getClientAddress(ctx: ChannelHandlerContext): Option[InetSocketAddress] =
ctx.getChannel.getRemoteAddress match {
case inet: InetSocketAddress Some(inet)
case _ None
case _ None
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try {
@ -894,8 +968,13 @@ class RemoteServerHandler(
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
val actorInfo = request.getActorInfo
EventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid))
val actorRef =
try { createActor(actorInfo, channel) } catch {
try {
createActor(actorInfo, channel)
} catch {
case e: SecurityException
EventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request))
@ -908,7 +987,8 @@ class RemoteServerHandler(
if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None
message match { // first match on system messages
message match {
// first match on system messages
case RemoteActorSystemMessage.Stop
if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor")
else actorRef.stop()
@ -923,22 +1003,22 @@ class RemoteServerHandler(
request.getActorInfo.getTimeout,
new ActorPromise(request.getActorInfo.getTimeout).
onComplete(_.value.get match {
case Left(exception) write(channel, createErrorReplyMessage(exception, request))
case r: Right[_,_]
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getAddress,
actorInfo.getTimeout,
r.asInstanceOf[Either[Throwable,Any]],
isOneWay = true,
Some(actorRef))
case Left(exception) write(channel, createErrorReplyMessage(exception, request))
case r: Right[_, _]
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Some(actorRef),
Right(request.getUuid),
actorInfo.getAddress,
actorInfo.getTimeout,
r.asInstanceOf[Either[Throwable, Any]],
isOneWay = true,
Some(actorRef))
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
write(channel, RemoteEncoder.encode(messageBuilder.build))
}))
write(channel, RemoteEncoder.encode(messageBuilder.build))
}))
}
}
@ -988,7 +1068,7 @@ class RemoteServerHandler(
private def findSessionActor(id: String, channel: Channel): ActorRef =
sessionActors.get(channel) match {
case null null
case map map get id
case map map get id
}
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {

View file

@ -21,6 +21,7 @@ import java.net.InetSocketAddress
import com.google.protobuf.ByteString
import com.eaio.uuid.UUID
import akka.event.EventHandler
/**
* Module for local actor serialization.
@ -142,6 +143,8 @@ object ActorSerialization {
overriddenUuid: Option[UUID],
loader: Option[ClassLoader]): ActorRef = {
EventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol))
val lifeCycle =
if (protocol.hasLifeCycle) {
protocol.getLifeCycle.getLifeCycle match {
@ -243,11 +246,17 @@ object RemoteActorSerialization {
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
RemoteActorRef(
EventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol))
val ref = RemoteActorRef(
JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress],
protocol.getAddress,
protocol.getTimeout,
loader)
EventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid))
ref
}
/**
@ -263,6 +272,9 @@ object RemoteActorSerialization {
case _
ReflectiveAccess.RemoteModule.configDefaultAddress
}
EventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress))
RemoteActorRefProtocol.newBuilder
.setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress)))
.setAddress(actor.address)

View file

@ -228,11 +228,11 @@ A child actor can tell the supervising actor to unlink him by sending him the 'U
.. code-block:: scala
if (supervisor.isDefined) supervisor.get ! Unlink(this)
if (supervisor.isDefined) supervisor.get ! Unlink(self)
// Or shorter using 'foreach':
supervisor.foreach(_ ! Unlink(this))
supervisor.foreach(_ ! Unlink(self))
The supervising actor's side of things
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^