diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index c6cd4a7fe1..33556e5ec2 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -7,9 +7,7 @@ import java.util.concurrent.CyclicBarrier import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLongArray - import scala.concurrent.duration._ - import akka.actor._ import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig @@ -23,6 +21,7 @@ import com.typesafe.config.ConfigFactory import io.aeron.Aeron import io.aeron.driver.MediaDriver import org.HdrHistogram.Histogram +import java.util.concurrent.atomic.AtomicBoolean object AeronStreamLatencySpec extends MultiNodeConfig { val first = role("first") @@ -73,6 +72,8 @@ abstract class AeronStreamLatencySpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamLatencySpec.totalMessagesFactor") val repeatCount = system.settings.config.getInt("akka.test.AeronStreamLatencySpec.repeatCount") + var plots = LatencyPlots() + val aeron = { val ctx = new Aeron.Context val driver = MediaDriver.launchEmbedded() @@ -104,10 +105,15 @@ abstract class AeronStreamLatencySpec override def afterAll(): Unit = { reporterExecutor.shutdown() + runOn(first) { + println(plots.plot50.csv(system.name + "50")) + println(plots.plot90.csv(system.name + "90")) + println(plots.plot99.csv(system.name + "99")) + } super.afterAll() } - def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = { + def printTotal(testName: String, payloadSize: Long, histogram: Histogram, lastRepeat: Boolean): Unit = { import scala.collection.JavaConverters._ val percentiles = histogram.percentiles(5) def percentile(p: Double): Double = @@ -122,6 +128,14 @@ abstract class AeronStreamLatencySpec f"99%%ile: ${percentile(99.0)}%.0f µs, ") println("Histogram of RTT latencies in microseconds.") histogram.outputPercentileDistribution(System.out, 1000.0) + + // only use the last repeat for the plots + if (lastRepeat) { + plots = plots.copy( + plot50 = plots.plot50.add(testName, percentile(50.0)), + plot90 = plots.plot90.add(testName, percentile(90.0)), + plot99 = plots.plot99.add(testName, percentile(99.0))) + } } val scenarios = List( @@ -159,6 +173,7 @@ abstract class AeronStreamLatencySpec val rep = reporter(testName) val barrier = new CyclicBarrier(2) val count = new AtomicInteger + val lastRepeat = new AtomicBoolean(false) Source.fromGraph(new AeronSource(channel(first), aeron)) .runForeach { bytes ⇒ if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") @@ -167,7 +182,7 @@ abstract class AeronStreamLatencySpec val d = System.nanoTime() - sendTimes.get(c - 1) histogram.recordValue(d) if (c == totalMessages) { - printTotal(testName, bytes.length, histogram) + printTotal(testName, bytes.length, histogram, lastRepeat.get) barrier.await() // this is always the last party } } @@ -175,6 +190,7 @@ abstract class AeronStreamLatencySpec for (n ← 1 to repeat) { histogram.reset() count.set(0) + lastRepeat.set(n == repeat) Source(1 to totalMessages) .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 22b42bbbd0..1115a2a552 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -68,6 +68,7 @@ object AeronStreamMaxThroughputSpec extends MultiNodeConfig { acc } } + } class AeronStreamMaxThroughputSpecMultiJvmNode1 extends AeronStreamMaxThroughputSpec @@ -81,6 +82,8 @@ abstract class AeronStreamMaxThroughputSpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor") + var plot = PlotResult() + val aeron = { val ctx = new Aeron.Context val driver = MediaDriver.launchEmbedded() @@ -114,15 +117,20 @@ abstract class AeronStreamMaxThroughputSpec override def afterAll(): Unit = { reporterExecutor.shutdown() + runOn(second) { + println(plot.csv(system.name)) + } super.afterAll() } def printTotal(testName: String, total: Long, startTime: Long, payloadSize: Long): Unit = { val d = (System.nanoTime - startTime).nanos.toMillis + val throughput = 1000.0 * total / d println(f"=== AeronStreamMaxThroughput $testName: " + - f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s, " + + f"${throughput}%.03g msg/s, ${throughput * payloadSize}%.03g bytes/s, " + s"payload size $payloadSize, " + s"$d ms to deliver $total messages") + plot = plot.add(testName, throughput * payloadSize / 1024 / 1024) } val scenarios = List( diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index 3cc9fca890..322bc12735 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -93,11 +93,11 @@ object LatencySpec extends MultiNodeConfig { } def receiverProps(reporter: RateReporter, settings: TestSettings, totalMessages: Int, - sendTimes: AtomicLongArray, histogram: Histogram): Props = - Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram)) + sendTimes: AtomicLongArray, histogram: Histogram, plotsRef: ActorRef): Props = + Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram, plotsRef)) class Receiver(reporter: RateReporter, settings: TestSettings, totalMessages: Int, - sendTimes: AtomicLongArray, histogram: Histogram) extends Actor { + sendTimes: AtomicLongArray, histogram: Histogram, plotsRef: ActorRef) extends Actor { import settings._ var count = 0 @@ -130,6 +130,12 @@ object LatencySpec extends MultiNodeConfig { f"99%%ile: ${percentile(99.0)}%.0f µs, ") println("Histogram of RTT latencies in microseconds.") histogram.outputPercentileDistribution(System.out, 1000.0) + + val plots = LatencyPlots( + PlotResult().add(testName, percentile(50.0)), + PlotResult().add(testName, percentile(90.0)), + PlotResult().add(testName, percentile(99.0))) + plotsRef ! plots } } @@ -153,6 +159,8 @@ abstract class LatencySpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.LatencySpec.totalMessagesFactor") val repeatCount = system.settings.config.getInt("akka.test.LatencySpec.repeatCount") + var plots = LatencyPlots() + val aeron = { val ctx = new Aeron.Context val driver = MediaDriver.launchEmbedded() @@ -184,6 +192,11 @@ abstract class LatencySpec override def afterAll(): Unit = { reporterExecutor.shutdown() + runOn(first) { + println(plots.plot50.csv(system.name + "50")) + println(plots.plot90.csv(system.name + "90")) + println(plots.plot99.csv(system.name + "99")) + } super.afterAll() } @@ -226,12 +239,13 @@ abstract class LatencySpec val rep = reporter(testName) val echo = identifyEcho() + val plotProbe = TestProbe() for (n ← 1 to repeat) { echo ! Reset expectMsg(Reset) histogram.reset() - val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram)) + val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram, plotProbe.ref)) Source(1 to totalMessages) .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) @@ -242,6 +256,14 @@ abstract class LatencySpec watch(receiver) expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds) + val p = plotProbe.expectMsgType[LatencyPlots] + // only use the last repeat for the plots + if (n == repeat) { + plots = plots.copy( + plot50 = plots.plot50.addAll(p.plot50), + plot90 = plots.plot90.addAll(p.plot90), + plot99 = plots.plot99.addAll(p.plot99)) + } } rep.halt() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index e56a6f1ec2..2aedd70c94 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -87,10 +87,10 @@ object MaxThroughputSpec extends MultiNodeConfig { } } - def senderProps(target: ActorRef, testSettings: TestSettings): Props = - Props(new Sender(target, testSettings)) + def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef): Props = + Props(new Sender(target, testSettings, plotRef)) - class Sender(target: ActorRef, testSettings: TestSettings) extends Actor { + class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef) extends Actor { import testSettings._ val payload = ("0" * testSettings.payloadSize).getBytes("utf-8") var startTime = 0L @@ -126,15 +126,17 @@ object MaxThroughputSpec extends MultiNodeConfig { case EndResult(totalReceived) ⇒ val took = NANOSECONDS.toMillis(System.nanoTime - startTime) - val throughtput = (totalReceived * 1000.0 / took).toInt + val throughput = (totalReceived * 1000.0 / took) println( s"=== MaxThroughput ${self.path.name}: " + - s"throughtput $throughtput msg/s, " + + f"throughput ${throughput}%.03g msg/s, " + + f"${throughput * payloadSize}%.03g bytes/s, " + s"dropped ${totalMessages - totalReceived}, " + s"max round-trip $maxRoundTripMillis ms, " + s"burst size $burstSize, " + s"payload size $payloadSize, " + s"$took ms to deliver $totalReceived messages") + plotRef ! PlotResult().add(testName, throughput * payloadSize / 1024 / 1024) context.stop(self) } @@ -176,6 +178,8 @@ abstract class MaxThroughputSpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor") + var plot = PlotResult() + def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong override def initialParticipants = roles.size @@ -196,6 +200,9 @@ abstract class MaxThroughputSpec override def afterAll(): Unit = { reporterExecutor.shutdown() + runOn(first) { + println(plot.csv(system.name)) + } super.afterAll() } @@ -246,18 +253,25 @@ abstract class MaxThroughputSpec runOn(first) { enterBarrier(receiverName + "-started") + val ignore = TestProbe() val senders = for (n ← 1 to senderReceiverPairs) yield { val receiver = identifyReceiver(receiverName + n) - val snd = system.actorOf(senderProps(receiver, testSettings), testName + "-snd" + n) - val p = TestProbe() - p.watch(snd) + val plotProbe = TestProbe() + val snd = system.actorOf(senderProps(receiver, testSettings, plotProbe.ref), + testName + "-snd" + n) + val terminationProbe = TestProbe() + terminationProbe.watch(snd) snd ! Run - (snd, p) + (snd, terminationProbe, plotProbe) } senders.foreach { - case (snd, p) ⇒ - val t = if (snd == senders.head._1) barrierTimeout else 10.seconds - p.expectTerminated(snd, t) + case (snd, terminationProbe, plotProbe) ⇒ + if (snd == senders.head._1) { + terminationProbe.expectTerminated(snd, barrierTimeout) + val plotResult = plotProbe.expectMsgType[PlotResult] + plot = plot.addAll(plotResult) + } else + terminationProbe.expectTerminated(snd, 10.seconds) } enterBarrier(testName + "-done") } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala new file mode 100644 index 0000000000..08858e62f1 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +final case class PlotResult(values: Vector[(String, Number)] = Vector.empty) { + + def add(key: String, value: Number): PlotResult = + copy(values = values :+ (key -> value)) + + def addAll(p: PlotResult): PlotResult = + copy(values ++ p.values) + + def csvLabels: String = values.map(_._1).mkString("\"", "\",\"", "\"") + + def csvValues: String = values.map(_._2).mkString("\"", "\",\"", "\"") + + // this can be split to two lines with bash: cut -d':' -f2,3 | tr ':' $'\n' + def csv(name: String): String = s"PLOT_${name}:${csvLabels}:${csvValues}" + +} + +final case class LatencyPlots(plot50: PlotResult = PlotResult(), plot90: PlotResult = PlotResult(), plot99: PlotResult = PlotResult())