Use Config in benchmarks
This commit is contained in:
parent
119ceb0278
commit
2721a879c2
12 changed files with 77 additions and 115 deletions
|
|
@ -7,7 +7,6 @@ import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
|
|||
import org.junit.runner.RunWith
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Props
|
||||
import java.util.Random
|
||||
import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics
|
||||
|
|
@ -23,9 +22,6 @@ class TellLatencyPerformanceSpec extends PerformanceSpec {
|
|||
.build
|
||||
|
||||
val repeat = 200L * repeatFactor
|
||||
def clientDelayMicros = {
|
||||
System.getProperty("benchmark.clientDelayMicros", "250").toInt
|
||||
}
|
||||
|
||||
var stat: DescriptiveStatistics = _
|
||||
|
||||
|
|
@ -67,19 +63,19 @@ class TellLatencyPerformanceSpec extends PerformanceSpec {
|
|||
val w3 = system.actorOf(new Waypoint(w4))
|
||||
val w2 = system.actorOf(new Waypoint(w3))
|
||||
val w1 = system.actorOf(new Waypoint(w2))
|
||||
Props(new Client(w1, latch, repeatsPerClient, clientDelayMicros, stat)).withDispatcher(clientDispatcher)
|
||||
Props(new Client(w1, latch, repeatsPerClient, clientDelay.toMicros.intValue, stat)).withDispatcher(clientDispatcher)
|
||||
}).toList.map(system.actorOf(_))
|
||||
|
||||
val start = System.nanoTime
|
||||
clients.foreach(_ ! Run)
|
||||
val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS)
|
||||
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
|
||||
val durationNs = (System.nanoTime - start)
|
||||
|
||||
if (!warmup) {
|
||||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, stat)
|
||||
}
|
||||
clients.foreach(_ ! PoisonPill)
|
||||
clients.foreach(_.stop())
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,13 +42,6 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec {
|
|||
val clientDispatcher = createDispatcher("client-dispatcher")
|
||||
//val destinationDispatcher = createDispatcher("destination-dispatcher")
|
||||
|
||||
override def atTermination {
|
||||
super.atTermination()
|
||||
System.out.println("Cleaning up after TellThroughputPerformanceSpec")
|
||||
clientDispatcher.shutdown()
|
||||
//destinationDispatcher.shutdown()
|
||||
}
|
||||
|
||||
val repeat = 30000L * repeatFactor
|
||||
|
||||
"Tell" must {
|
||||
|
|
@ -152,7 +145,7 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec {
|
|||
|
||||
val start = System.nanoTime
|
||||
clients.foreach(_ ! Run)
|
||||
val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS)
|
||||
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
|
||||
val durationNs = (System.nanoTime - start)
|
||||
|
||||
if (!ok) {
|
||||
|
|
@ -180,8 +173,8 @@ class TellThroughput10000PerformanceSpec extends PerformanceSpec {
|
|||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, repeat)
|
||||
}
|
||||
clients.foreach(_ ! PoisonPill)
|
||||
destinations.foreach(_ ! PoisonPill)
|
||||
clients.foreach(_.stop())
|
||||
destinations.foreach(_.stop())
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,13 +22,6 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec {
|
|||
val clientDispatcher = createDispatcher("client-dispatcher")
|
||||
val destinationDispatcher = createDispatcher("destination-dispatcher")
|
||||
|
||||
override def atTermination {
|
||||
super.atTermination()
|
||||
System.out.println("Cleaning up after TellThroughputComputationPerformanceSpec")
|
||||
clientDispatcher.shutdown()
|
||||
destinationDispatcher.shutdown()
|
||||
}
|
||||
|
||||
val repeat = 500L * repeatFactor
|
||||
|
||||
"Tell" must {
|
||||
|
|
@ -126,7 +119,7 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec {
|
|||
|
||||
val start = System.nanoTime
|
||||
clients.foreach(_ ! Run)
|
||||
val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS)
|
||||
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
|
||||
val durationNs = (System.nanoTime - start)
|
||||
|
||||
if (!ok) {
|
||||
|
|
@ -154,8 +147,8 @@ class TellThroughputComputationPerformanceSpec extends PerformanceSpec {
|
|||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, repeat)
|
||||
}
|
||||
clients.foreach(_ ! PoisonPill)
|
||||
destinations.foreach(_ ! PoisonPill)
|
||||
clients.foreach(_.stop())
|
||||
destinations.foreach(_.stop())
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -211,7 +204,7 @@ object TellThroughputComputationPerformanceSpec {
|
|||
actor ! Msg
|
||||
sent += 1
|
||||
} else if (received >= repeat) {
|
||||
println("PI: " + pi)
|
||||
//println("PI: " + pi)
|
||||
latch.countDown()
|
||||
}
|
||||
case Run ⇒
|
||||
|
|
|
|||
|
|
@ -22,13 +22,6 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
|
|||
val clientDispatcher = createDispatcher("client-dispatcher")
|
||||
val destinationDispatcher = createDispatcher("destination-dispatcher")
|
||||
|
||||
override def atTermination {
|
||||
super.atTermination()
|
||||
System.out.println("Cleaning up after TellThroughputPerformanceSpec")
|
||||
clientDispatcher.shutdown()
|
||||
destinationDispatcher.shutdown()
|
||||
}
|
||||
|
||||
val repeat = 30000L * repeatFactor
|
||||
|
||||
"Tell" must {
|
||||
|
|
@ -78,36 +71,15 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
|
|||
|
||||
val start = System.nanoTime
|
||||
clients.foreach(_ ! Run)
|
||||
val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS)
|
||||
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
|
||||
val durationNs = (System.nanoTime - start)
|
||||
|
||||
if (!ok) {
|
||||
System.err.println("Destinations: ")
|
||||
destinations.foreach {
|
||||
case l: LocalActorRef ⇒
|
||||
val m = l.underlying.mailbox
|
||||
System.err.println(" -" + l + " mbox(" + m.status + ")" + " containing [" + Stream.continually(m.dequeue()).takeWhile(_ != null).mkString(", ") + "] and has systemMsgs: " + m.hasSystemMessages)
|
||||
}
|
||||
System.err.println("")
|
||||
System.err.println("Clients: ")
|
||||
|
||||
clients.foreach {
|
||||
case l: LocalActorRef ⇒
|
||||
val m = l.underlying.mailbox
|
||||
System.err.println(" -" + l + " mbox(" + m.status + ")" + " containing [" + Stream.continually(m.dequeue()).takeWhile(_ != null).mkString(", ") + "] and has systemMsgs: " + m.hasSystemMessages)
|
||||
}
|
||||
|
||||
val e = clientDispatcher.asInstanceOf[Dispatcher].executorService.get().asInstanceOf[ExecutorServiceDelegate].executor.asInstanceOf[ThreadPoolExecutor]
|
||||
val q = e.getQueue
|
||||
System.err.println("Client Dispatcher: " + e.getActiveCount + " " + Stream.continually(q.poll()).takeWhile(_ != null).mkString(", "))
|
||||
}
|
||||
|
||||
if (!warmup) {
|
||||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, repeat)
|
||||
}
|
||||
clients.foreach(_ ! PoisonPill)
|
||||
destinations.foreach(_ ! PoisonPill)
|
||||
clients.foreach(_.stop())
|
||||
destinations.foreach(_.stop())
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,13 +26,6 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec {
|
|||
//val clientDispatcher = createDispatcher("client-dispatcher")
|
||||
//val destinationDispatcher = createDispatcher("destination-dispatcher")
|
||||
|
||||
override def atTermination {
|
||||
super.atTermination()
|
||||
System.out.println("Cleaning up after TellThroughputPerformanceSpec")
|
||||
//clientDispatcher.shutdown()
|
||||
//destinationDispatcher.shutdown()
|
||||
}
|
||||
|
||||
val repeat = 30000L * repeatFactor
|
||||
|
||||
"Tell" must {
|
||||
|
|
@ -138,7 +131,7 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec {
|
|||
|
||||
val start = System.nanoTime
|
||||
clients.foreach(_ ! Run)
|
||||
val ok = latch.await((5000000 + 500 * repeat) * timeDilation, TimeUnit.MICROSECONDS)
|
||||
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
|
||||
val durationNs = (System.nanoTime - start)
|
||||
|
||||
if (!ok) {
|
||||
|
|
@ -166,8 +159,8 @@ class TellThroughputSeparateDispatchersPerformanceSpec extends PerformanceSpec {
|
|||
ok must be(true)
|
||||
logMeasurement(numberOfClients, durationNs, repeat)
|
||||
}
|
||||
clients.foreach(_ ! PoisonPill)
|
||||
destinations.foreach(_ ! PoisonPill)
|
||||
clients.foreach(_.stop())
|
||||
destinations.foreach(_.stop())
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
package akka.performance.trading.domain
|
||||
import akka.performance.workbench.BenchmarkConfig
|
||||
|
||||
abstract class Orderbook(val symbol: String) {
|
||||
var bidSide: List[Bid] = Nil
|
||||
|
|
@ -49,7 +50,7 @@ abstract class Orderbook(val symbol: String) {
|
|||
|
||||
object Orderbook {
|
||||
|
||||
val useDummyOrderbook = System.getProperty("benchmark.useDummyOrderbook", "false").toBoolean
|
||||
val useDummyOrderbook = BenchmarkConfig.config.getBoolean("benchmark.useDummyOrderbook")
|
||||
|
||||
def apply(symbol: String, standby: Boolean): Orderbook = (useDummyOrderbook, standby) match {
|
||||
case (false, false) ⇒ new Orderbook(symbol) with NopTradeObserver
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistic
|
|||
import org.junit.runner.RunWith
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Props
|
||||
import akka.performance.trading.domain.Ask
|
||||
import akka.performance.trading.domain.Bid
|
||||
|
|
@ -31,10 +30,6 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
|
|||
var stat: DescriptiveStatistics = _
|
||||
val random: Random = new Random(0)
|
||||
|
||||
def clientDelayMicros = {
|
||||
System.getProperty("benchmark.clientDelayMicros", "250").toInt
|
||||
}
|
||||
|
||||
override def beforeEach() {
|
||||
super.beforeEach()
|
||||
stat = new SynchronizedDescriptiveStatistics
|
||||
|
|
@ -98,12 +93,12 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
|
|||
val start = System.nanoTime
|
||||
val clients = (for (i ← 0 until numberOfClients) yield {
|
||||
val receiver = receivers(i % receivers.size)
|
||||
val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelayMicros)).withDispatcher(clientDispatcher)
|
||||
val props = Props(new Client(receiver, orders, latch, ordersPerClient, clientDelay.toMicros.toInt)).withDispatcher(clientDispatcher)
|
||||
system.actorOf(props)
|
||||
})
|
||||
|
||||
clients.foreach(_ ! "run")
|
||||
val ok = latch.await((5000000L + (clientDelayMicros + 500) * totalNumberOfOrders) * timeDilation, TimeUnit.MICROSECONDS)
|
||||
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
|
||||
val durationNs = (System.nanoTime - start)
|
||||
|
||||
if (!warmup) {
|
||||
|
|
@ -113,7 +108,7 @@ class TradingLatencyPerformanceSpec extends PerformanceSpec {
|
|||
}
|
||||
logMeasurement(numberOfClients, durationNs, stat)
|
||||
}
|
||||
clients.foreach(_ ! PoisonPill)
|
||||
clients.foreach(_.stop())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistic
|
|||
import org.junit.runner.RunWith
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Props
|
||||
import akka.performance.trading.domain.Ask
|
||||
import akka.performance.trading.domain.Bid
|
||||
|
|
@ -96,7 +95,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
|
|||
})
|
||||
|
||||
clients.foreach(_ ! "run")
|
||||
val ok = latch.await((5000000L + 500 * totalNumberOfOrders) * timeDilation, TimeUnit.MICROSECONDS)
|
||||
val ok = latch.await(maxRunDuration.toMillis, TimeUnit.MILLISECONDS)
|
||||
val durationNs = (System.nanoTime - start)
|
||||
|
||||
if (!warmup) {
|
||||
|
|
@ -106,7 +105,7 @@ class TradingThroughputPerformanceSpec extends PerformanceSpec {
|
|||
}
|
||||
logMeasurement(numberOfClients, durationNs, totalNumberOfOrders)
|
||||
}
|
||||
clients.foreach(_ ! PoisonPill)
|
||||
clients.foreach(_.stop())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,9 +41,10 @@ class FileBenchResultRepository extends BenchResultRepository {
|
|||
private val statsByName = MutableMap[String, Seq[Stats]]()
|
||||
private val baselineStats = MutableMap[Key, Stats]()
|
||||
private val historicalStats = MutableMap[Key, Seq[Stats]]()
|
||||
private val serDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/ser"
|
||||
private def resultDir = BenchmarkConfig.config.getString("benchmark.resultDir")
|
||||
private val serDir = resultDir + "/ser"
|
||||
private def serDirExists: Boolean = new File(serDir).exists
|
||||
private val htmlDir = System.getProperty("benchmark.resultDir", "target/benchmark") + "/html"
|
||||
private val htmlDir = resultDir + "/html"
|
||||
private def htmlDirExists: Boolean = new File(htmlDir).exists
|
||||
protected val maxHistorical = 7
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,32 @@
|
|||
package akka.performance.workbench
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object BenchmarkConfig {
|
||||
private val benchmarkConfig = ConfigFactory.parseString("""
|
||||
benchmark {
|
||||
longRunning = false
|
||||
minClients = 1
|
||||
maxClients = 4
|
||||
repeatFactor = 2
|
||||
timeDilation = 1
|
||||
maxRunDuration = 10 seconds
|
||||
clientDelay = 250000 nanoseconds
|
||||
logResult = true
|
||||
resultDir = "target/benchmark"
|
||||
useDummyOrderbook = false
|
||||
}
|
||||
""")
|
||||
private val longRunningBenchmarkConfig = ConfigFactory.parseString("""
|
||||
benchmark {
|
||||
longRunning = true
|
||||
maxClients = 48
|
||||
repeatFactor = 150
|
||||
maxRunDuration = 120 seconds
|
||||
useDummyOrderbook = true
|
||||
}
|
||||
""").withFallback(benchmarkConfig)
|
||||
|
||||
def config = if (System.getProperty("benchmark.longRunning") == "true")
|
||||
longRunningBenchmarkConfig else benchmarkConfig
|
||||
|
||||
}
|
||||
|
|
@ -1,33 +1,25 @@
|
|||
package akka.performance.workbench
|
||||
|
||||
import scala.collection.immutable.TreeMap
|
||||
|
||||
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import akka.actor.simpleName
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.ActorSystem
|
||||
import akka.util.Duration
|
||||
import com.typesafe.config.Config
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
trait PerformanceSpec extends AkkaSpec with BeforeAndAfterEach {
|
||||
abstract class PerformanceSpec(cfg: Config = BenchmarkConfig.config) extends AkkaSpec(cfg) with BeforeAndAfterEach {
|
||||
|
||||
def isBenchmark() = System.getProperty("benchmark") == "true"
|
||||
|
||||
def minClients() = System.getProperty("benchmark.minClients", "1").toInt;
|
||||
|
||||
def maxClients() = {
|
||||
val default = if (isBenchmark) "48" else "4"
|
||||
System.getProperty("benchmark.maxClients", default).toInt;
|
||||
}
|
||||
|
||||
def repeatFactor() = {
|
||||
val defaultRepeatFactor = if (isBenchmark) "150" else "2"
|
||||
System.getProperty("benchmark.repeatFactor", defaultRepeatFactor).toInt
|
||||
}
|
||||
|
||||
def timeDilation() = {
|
||||
System.getProperty("benchmark.timeDilation", "1").toLong
|
||||
}
|
||||
def config = system.settings.config
|
||||
def isLongRunningBenchmark() = config.getBoolean("benchmark.longRunning")
|
||||
def minClients() = config.getInt("benchmark.minClients")
|
||||
def maxClients() = config.getInt("benchmark.maxClients")
|
||||
def repeatFactor() = config.getInt("benchmark.repeatFactor")
|
||||
def timeDilation() = config.getLong("benchmark.timeDilation")
|
||||
def maxRunDuration() = Duration(config.getMilliseconds("benchmark.maxRunDuration"), TimeUnit.MILLISECONDS)
|
||||
def clientDelay = Duration(config.getNanoseconds("benchmark.clientDelay"), TimeUnit.NANOSECONDS)
|
||||
|
||||
val resultRepository = BenchResultRepository()
|
||||
lazy val report = new Report(system, resultRepository, compareResultWith)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ class Report(
|
|||
resultRepository: BenchResultRepository,
|
||||
compareResultWith: Option[String] = None) {
|
||||
|
||||
private def doLog = System.getProperty("benchmark.logResult", "true").toBoolean
|
||||
private def doLog = system.settings.config.getBoolean("benchmark.logResult")
|
||||
val log = Logging(system, "Report")
|
||||
|
||||
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
|
||||
|
|
@ -189,12 +189,8 @@ class Report(
|
|||
|
||||
val sb = new StringBuilder
|
||||
|
||||
sb.append("Benchmark properties:")
|
||||
import scala.collection.JavaConversions._
|
||||
val propNames: Seq[String] = System.getProperties.propertyNames.toSeq.map(_.toString)
|
||||
for (name ← propNames if name.startsWith("benchmark")) {
|
||||
sb.append("\n ").append(name).append("=").append(System.getProperty(name))
|
||||
}
|
||||
sb.append("Benchmark properties:\n")
|
||||
sb.append(system.settings.config.getConfig("benchmark").root.render)
|
||||
sb.append("\n")
|
||||
|
||||
sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion)
|
||||
|
|
@ -215,16 +211,15 @@ class Report(
|
|||
append(")").append(" MB")
|
||||
sb.append("\n")
|
||||
|
||||
val args = runtime.getInputArguments.filterNot(_.contains("classpath")).mkString("\n ")
|
||||
import scala.collection.JavaConverters._
|
||||
val args = runtime.getInputArguments.asScala.filterNot(_.contains("classpath")).mkString("\n ")
|
||||
sb.append("Args:\n ").append(args)
|
||||
sb.append("\n")
|
||||
|
||||
sb.append("Akka version: ").append(system.settings.ConfigVersion)
|
||||
sb.append("\n")
|
||||
sb.append("Akka config:")
|
||||
for ((key, value) ← system.settings.config.root) {
|
||||
sb.append("\n ").append(key).append("=").append(value)
|
||||
}
|
||||
sb.append("Akka config:\n")
|
||||
sb.append(system.settings.toString)
|
||||
|
||||
sb.toString
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue