add output for trend plots
* use Jenkins Plot plugin * output csv values to log and then use post build step to turn those into csv files that the plot plugin understands
This commit is contained in:
parent
263e3a3b99
commit
c01b3e8fed
5 changed files with 104 additions and 21 deletions
|
|
@ -7,9 +7,7 @@ import java.util.concurrent.CyclicBarrier
|
|||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.atomic.AtomicLongArray
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor._
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
|
|
@ -23,6 +21,7 @@ import com.typesafe.config.ConfigFactory
|
|||
import io.aeron.Aeron
|
||||
import io.aeron.driver.MediaDriver
|
||||
import org.HdrHistogram.Histogram
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
object AeronStreamLatencySpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
|
|
@ -73,6 +72,8 @@ abstract class AeronStreamLatencySpec
|
|||
val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamLatencySpec.totalMessagesFactor")
|
||||
val repeatCount = system.settings.config.getInt("akka.test.AeronStreamLatencySpec.repeatCount")
|
||||
|
||||
var plots = LatencyPlots()
|
||||
|
||||
val aeron = {
|
||||
val ctx = new Aeron.Context
|
||||
val driver = MediaDriver.launchEmbedded()
|
||||
|
|
@ -104,10 +105,15 @@ abstract class AeronStreamLatencySpec
|
|||
|
||||
override def afterAll(): Unit = {
|
||||
reporterExecutor.shutdown()
|
||||
runOn(first) {
|
||||
println(plots.plot50.csv(system.name + "50"))
|
||||
println(plots.plot90.csv(system.name + "90"))
|
||||
println(plots.plot99.csv(system.name + "99"))
|
||||
}
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = {
|
||||
def printTotal(testName: String, payloadSize: Long, histogram: Histogram, lastRepeat: Boolean): Unit = {
|
||||
import scala.collection.JavaConverters._
|
||||
val percentiles = histogram.percentiles(5)
|
||||
def percentile(p: Double): Double =
|
||||
|
|
@ -122,6 +128,14 @@ abstract class AeronStreamLatencySpec
|
|||
f"99%%ile: ${percentile(99.0)}%.0f µs, ")
|
||||
println("Histogram of RTT latencies in microseconds.")
|
||||
histogram.outputPercentileDistribution(System.out, 1000.0)
|
||||
|
||||
// only use the last repeat for the plots
|
||||
if (lastRepeat) {
|
||||
plots = plots.copy(
|
||||
plot50 = plots.plot50.add(testName, percentile(50.0)),
|
||||
plot90 = plots.plot90.add(testName, percentile(90.0)),
|
||||
plot99 = plots.plot99.add(testName, percentile(99.0)))
|
||||
}
|
||||
}
|
||||
|
||||
val scenarios = List(
|
||||
|
|
@ -159,6 +173,7 @@ abstract class AeronStreamLatencySpec
|
|||
val rep = reporter(testName)
|
||||
val barrier = new CyclicBarrier(2)
|
||||
val count = new AtomicInteger
|
||||
val lastRepeat = new AtomicBoolean(false)
|
||||
Source.fromGraph(new AeronSource(channel(first), aeron))
|
||||
.runForeach { bytes ⇒
|
||||
if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message")
|
||||
|
|
@ -167,7 +182,7 @@ abstract class AeronStreamLatencySpec
|
|||
val d = System.nanoTime() - sendTimes.get(c - 1)
|
||||
histogram.recordValue(d)
|
||||
if (c == totalMessages) {
|
||||
printTotal(testName, bytes.length, histogram)
|
||||
printTotal(testName, bytes.length, histogram, lastRepeat.get)
|
||||
barrier.await() // this is always the last party
|
||||
}
|
||||
}
|
||||
|
|
@ -175,6 +190,7 @@ abstract class AeronStreamLatencySpec
|
|||
for (n ← 1 to repeat) {
|
||||
histogram.reset()
|
||||
count.set(0)
|
||||
lastRepeat.set(n == repeat)
|
||||
|
||||
Source(1 to totalMessages)
|
||||
.throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping)
|
||||
|
|
|
|||
|
|
@ -68,6 +68,7 @@ object AeronStreamMaxThroughputSpec extends MultiNodeConfig {
|
|||
acc
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class AeronStreamMaxThroughputSpecMultiJvmNode1 extends AeronStreamMaxThroughputSpec
|
||||
|
|
@ -81,6 +82,8 @@ abstract class AeronStreamMaxThroughputSpec
|
|||
|
||||
val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor")
|
||||
|
||||
var plot = PlotResult()
|
||||
|
||||
val aeron = {
|
||||
val ctx = new Aeron.Context
|
||||
val driver = MediaDriver.launchEmbedded()
|
||||
|
|
@ -114,15 +117,20 @@ abstract class AeronStreamMaxThroughputSpec
|
|||
|
||||
override def afterAll(): Unit = {
|
||||
reporterExecutor.shutdown()
|
||||
runOn(second) {
|
||||
println(plot.csv(system.name))
|
||||
}
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
def printTotal(testName: String, total: Long, startTime: Long, payloadSize: Long): Unit = {
|
||||
val d = (System.nanoTime - startTime).nanos.toMillis
|
||||
val throughput = 1000.0 * total / d
|
||||
println(f"=== AeronStreamMaxThroughput $testName: " +
|
||||
f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s, " +
|
||||
f"${throughput}%.03g msg/s, ${throughput * payloadSize}%.03g bytes/s, " +
|
||||
s"payload size $payloadSize, " +
|
||||
s"$d ms to deliver $total messages")
|
||||
plot = plot.add(testName, throughput * payloadSize / 1024 / 1024)
|
||||
}
|
||||
|
||||
val scenarios = List(
|
||||
|
|
|
|||
|
|
@ -93,11 +93,11 @@ object LatencySpec extends MultiNodeConfig {
|
|||
}
|
||||
|
||||
def receiverProps(reporter: RateReporter, settings: TestSettings, totalMessages: Int,
|
||||
sendTimes: AtomicLongArray, histogram: Histogram): Props =
|
||||
Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram))
|
||||
sendTimes: AtomicLongArray, histogram: Histogram, plotsRef: ActorRef): Props =
|
||||
Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram, plotsRef))
|
||||
|
||||
class Receiver(reporter: RateReporter, settings: TestSettings, totalMessages: Int,
|
||||
sendTimes: AtomicLongArray, histogram: Histogram) extends Actor {
|
||||
sendTimes: AtomicLongArray, histogram: Histogram, plotsRef: ActorRef) extends Actor {
|
||||
import settings._
|
||||
|
||||
var count = 0
|
||||
|
|
@ -130,6 +130,12 @@ object LatencySpec extends MultiNodeConfig {
|
|||
f"99%%ile: ${percentile(99.0)}%.0f µs, ")
|
||||
println("Histogram of RTT latencies in microseconds.")
|
||||
histogram.outputPercentileDistribution(System.out, 1000.0)
|
||||
|
||||
val plots = LatencyPlots(
|
||||
PlotResult().add(testName, percentile(50.0)),
|
||||
PlotResult().add(testName, percentile(90.0)),
|
||||
PlotResult().add(testName, percentile(99.0)))
|
||||
plotsRef ! plots
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -153,6 +159,8 @@ abstract class LatencySpec
|
|||
val totalMessagesFactor = system.settings.config.getDouble("akka.test.LatencySpec.totalMessagesFactor")
|
||||
val repeatCount = system.settings.config.getInt("akka.test.LatencySpec.repeatCount")
|
||||
|
||||
var plots = LatencyPlots()
|
||||
|
||||
val aeron = {
|
||||
val ctx = new Aeron.Context
|
||||
val driver = MediaDriver.launchEmbedded()
|
||||
|
|
@ -184,6 +192,11 @@ abstract class LatencySpec
|
|||
|
||||
override def afterAll(): Unit = {
|
||||
reporterExecutor.shutdown()
|
||||
runOn(first) {
|
||||
println(plots.plot50.csv(system.name + "50"))
|
||||
println(plots.plot90.csv(system.name + "90"))
|
||||
println(plots.plot99.csv(system.name + "99"))
|
||||
}
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
|
|
@ -226,12 +239,13 @@ abstract class LatencySpec
|
|||
val rep = reporter(testName)
|
||||
|
||||
val echo = identifyEcho()
|
||||
val plotProbe = TestProbe()
|
||||
|
||||
for (n ← 1 to repeat) {
|
||||
echo ! Reset
|
||||
expectMsg(Reset)
|
||||
histogram.reset()
|
||||
val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram))
|
||||
val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram, plotProbe.ref))
|
||||
|
||||
Source(1 to totalMessages)
|
||||
.throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping)
|
||||
|
|
@ -242,6 +256,14 @@ abstract class LatencySpec
|
|||
|
||||
watch(receiver)
|
||||
expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds)
|
||||
val p = plotProbe.expectMsgType[LatencyPlots]
|
||||
// only use the last repeat for the plots
|
||||
if (n == repeat) {
|
||||
plots = plots.copy(
|
||||
plot50 = plots.plot50.addAll(p.plot50),
|
||||
plot90 = plots.plot90.addAll(p.plot90),
|
||||
plot99 = plots.plot99.addAll(p.plot99))
|
||||
}
|
||||
}
|
||||
|
||||
rep.halt()
|
||||
|
|
|
|||
|
|
@ -87,10 +87,10 @@ object MaxThroughputSpec extends MultiNodeConfig {
|
|||
}
|
||||
}
|
||||
|
||||
def senderProps(target: ActorRef, testSettings: TestSettings): Props =
|
||||
Props(new Sender(target, testSettings))
|
||||
def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef): Props =
|
||||
Props(new Sender(target, testSettings, plotRef))
|
||||
|
||||
class Sender(target: ActorRef, testSettings: TestSettings) extends Actor {
|
||||
class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef) extends Actor {
|
||||
import testSettings._
|
||||
val payload = ("0" * testSettings.payloadSize).getBytes("utf-8")
|
||||
var startTime = 0L
|
||||
|
|
@ -126,15 +126,17 @@ object MaxThroughputSpec extends MultiNodeConfig {
|
|||
|
||||
case EndResult(totalReceived) ⇒
|
||||
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
|
||||
val throughtput = (totalReceived * 1000.0 / took).toInt
|
||||
val throughput = (totalReceived * 1000.0 / took)
|
||||
println(
|
||||
s"=== MaxThroughput ${self.path.name}: " +
|
||||
s"throughtput $throughtput msg/s, " +
|
||||
f"throughput ${throughput}%.03g msg/s, " +
|
||||
f"${throughput * payloadSize}%.03g bytes/s, " +
|
||||
s"dropped ${totalMessages - totalReceived}, " +
|
||||
s"max round-trip $maxRoundTripMillis ms, " +
|
||||
s"burst size $burstSize, " +
|
||||
s"payload size $payloadSize, " +
|
||||
s"$took ms to deliver $totalReceived messages")
|
||||
plotRef ! PlotResult().add(testName, throughput * payloadSize / 1024 / 1024)
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
|
|
@ -176,6 +178,8 @@ abstract class MaxThroughputSpec
|
|||
|
||||
val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor")
|
||||
|
||||
var plot = PlotResult()
|
||||
|
||||
def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong
|
||||
|
||||
override def initialParticipants = roles.size
|
||||
|
|
@ -196,6 +200,9 @@ abstract class MaxThroughputSpec
|
|||
|
||||
override def afterAll(): Unit = {
|
||||
reporterExecutor.shutdown()
|
||||
runOn(first) {
|
||||
println(plot.csv(system.name))
|
||||
}
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
|
|
@ -246,18 +253,25 @@ abstract class MaxThroughputSpec
|
|||
|
||||
runOn(first) {
|
||||
enterBarrier(receiverName + "-started")
|
||||
val ignore = TestProbe()
|
||||
val senders = for (n ← 1 to senderReceiverPairs) yield {
|
||||
val receiver = identifyReceiver(receiverName + n)
|
||||
val snd = system.actorOf(senderProps(receiver, testSettings), testName + "-snd" + n)
|
||||
val p = TestProbe()
|
||||
p.watch(snd)
|
||||
val plotProbe = TestProbe()
|
||||
val snd = system.actorOf(senderProps(receiver, testSettings, plotProbe.ref),
|
||||
testName + "-snd" + n)
|
||||
val terminationProbe = TestProbe()
|
||||
terminationProbe.watch(snd)
|
||||
snd ! Run
|
||||
(snd, p)
|
||||
(snd, terminationProbe, plotProbe)
|
||||
}
|
||||
senders.foreach {
|
||||
case (snd, p) ⇒
|
||||
val t = if (snd == senders.head._1) barrierTimeout else 10.seconds
|
||||
p.expectTerminated(snd, t)
|
||||
case (snd, terminationProbe, plotProbe) ⇒
|
||||
if (snd == senders.head._1) {
|
||||
terminationProbe.expectTerminated(snd, barrierTimeout)
|
||||
val plotResult = plotProbe.expectMsgType[PlotResult]
|
||||
plot = plot.addAll(plotResult)
|
||||
} else
|
||||
terminationProbe.expectTerminated(snd, 10.seconds)
|
||||
}
|
||||
enterBarrier(testName + "-done")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,23 @@
|
|||
/**
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
final case class PlotResult(values: Vector[(String, Number)] = Vector.empty) {
|
||||
|
||||
def add(key: String, value: Number): PlotResult =
|
||||
copy(values = values :+ (key -> value))
|
||||
|
||||
def addAll(p: PlotResult): PlotResult =
|
||||
copy(values ++ p.values)
|
||||
|
||||
def csvLabels: String = values.map(_._1).mkString("\"", "\",\"", "\"")
|
||||
|
||||
def csvValues: String = values.map(_._2).mkString("\"", "\",\"", "\"")
|
||||
|
||||
// this can be split to two lines with bash: cut -d':' -f2,3 | tr ':' $'\n'
|
||||
def csv(name: String): String = s"PLOT_${name}:${csvLabels}:${csvValues}"
|
||||
|
||||
}
|
||||
|
||||
final case class LatencyPlots(plot50: PlotResult = PlotResult(), plot90: PlotResult = PlotResult(), plot99: PlotResult = PlotResult())
|
||||
Loading…
Add table
Add a link
Reference in a new issue