From 4eb76df3b78353c6bab834d50b458876297ff5dd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Apr 2016 17:36:28 +0200 Subject: [PATCH 1/3] improve Aeron source and sink * use FragmentAssembler for large messages * no need for aeron factory * some logging/listeners of aeron events and exceptions --- .../scala/akka/remote/artery/AeronSink.scala | 13 +++-- .../akka/remote/artery/AeronSource.scala | 57 +++++++++++-------- .../akka/remote/artery/ArterySubsystem.scala | 5 +- .../scala/akka/remote/artery/Transport.scala | 46 +++++++++++++-- 4 files changed, 84 insertions(+), 37 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index 637bd8e7e6..06296c6e78 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -25,7 +25,7 @@ object AeronSink { /** * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ -class AeronSink(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] { +class AeronSink(channel: String, aeron: Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] { import AeronSink._ val in: Inlet[Bytes] = Inlet("AeronSink") @@ -36,10 +36,10 @@ class AeronSink(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SinkSha private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(128 * 1024)) private val streamId = 10 - private val pub = aeron().addPublication(channel, streamId) + private val pub = aeron.addPublication(channel, streamId) private val idleStrategy = new BackoffIdleStrategy( 100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100)) - private val retries = 120 + private val retries = 130 private var backoffCount = retries private var lastMsgSize = 0 @@ -68,13 +68,14 @@ class AeronSink(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SinkSha if (backoffCount == 1) { println(s"# drop") // FIXME pull(in) // drop it - } else if (backoffCount <= 5) { + } else if (backoffCount <= 15) { // TODO Instead of using the scheduler we should handoff the task of // retrying/polling to a separate thread that performs the polling for // all sources/sinks and notifies back when there is some news. - // println(s"# scheduled backoff ${6 - backoffCount}") // FIXME backoffCount -= 1 - if (backoffCount <= 2) + if (backoffCount <= 10) + println(s"# snd backoff $backoffCount") // FIXME + if (backoffCount <= 10) scheduleOnce(Backoff, 50.millis) else scheduleOnce(Backoff, 1.millis) diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index cdc2333cfa..e1c90b4b56 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -15,6 +15,7 @@ import akka.stream.stage.GraphStageLogic import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic import io.aeron.Aeron +import io.aeron.FragmentAssembler import io.aeron.logbuffer.FragmentHandler import io.aeron.logbuffer.Header import org.agrona.DirectBuffer @@ -29,7 +30,7 @@ object AeronSource { /** * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ -class AeronSource(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] { +class AeronSource(channel: String, aeron: Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] { import AeronSource._ val out: Outlet[Bytes] = Outlet("AeronSource") @@ -38,26 +39,30 @@ class AeronSource(channel: String, aeron: () ⇒ Aeron) extends GraphStage[Sourc override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with OutHandler { - private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(256)) + private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(128 * 1024)) private val streamId = 10 - private val sub = aeron().addSubscription(channel, streamId) + private val sub = aeron.addSubscription(channel, streamId) private val running = new AtomicBoolean(true) + private val spinning = 20000 + private val yielding = 0 + private val parking = 50 private val idleStrategy = new BackoffIdleStrategy( - 100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100)) - private val retries = 115 - private var backoffCount = retries + spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100)) + private val idleStrategyRetries = spinning + yielding + parking + private var backoffCount = idleStrategyRetries + private val backoffDuration1 = 1.millis + private val backoffDuration2 = 50.millis + private var messageReceived = false - val receiveMessage = getAsyncCallback[Bytes] { data ⇒ - push(out, data) - } - - val fragmentHandler: FragmentHandler = new FragmentHandler { + // the fragmentHandler is called from `poll` in same thread, i.e. no async callback is needed + val fragmentHandler = new FragmentAssembler(new FragmentHandler { override def onFragment(buffer: DirectBuffer, offset: Int, length: Int, header: Header): Unit = { + messageReceived = true val data = Array.ofDim[Byte](length) buffer.getBytes(offset, data); - receiveMessage.invoke(data) + push(out, data) } - } + }) override def postStop(): Unit = { running.set(false) @@ -67,29 +72,31 @@ class AeronSource(channel: String, aeron: () ⇒ Aeron) extends GraphStage[Sourc // OutHandler override def onPull(): Unit = { idleStrategy.reset() - backoffCount = retries + backoffCount = idleStrategyRetries subscriberLoop() } @tailrec private def subscriberLoop(): Unit = if (running.get) { + messageReceived = false // will be set by the fragmentHandler if got full msg + // we only poll 1 fragment, otherwise we would have to use another buffer for + // received messages that can't be pushed val fragmentsRead = sub.poll(fragmentHandler, 1) - if (fragmentsRead <= 0) { + if (fragmentsRead > 0 && !messageReceived) + subscriberLoop() // recursive, read more fragments + else if (fragmentsRead <= 0) { // TODO the backoff strategy should be measured and tuned - if (backoffCount <= 0) { + backoffCount -= 1 + if (backoffCount > 0) { + idleStrategy.idle() + subscriberLoop() // recursive + } else if (backoffCount > -1000) { // TODO Instead of using the scheduler we should handoff the task of // retrying/polling to a separate thread that performs the polling for // all sources/sinks and notifies back when there is some news. - // println(s"# scheduled backoff ${0 - backoffCount + 1}") // FIXME - backoffCount -= 1 - if (backoffCount <= -5) - scheduleOnce(Backoff, 50.millis) - else - scheduleOnce(Backoff, 1.millis) + scheduleOnce(Backoff, backoffDuration1) } else { - idleStrategy.idle() - backoffCount -= 1 - subscriberLoop() // recursive + scheduleOnce(Backoff, backoffDuration2) } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala index 714ea0c51e..bf950c5591 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala @@ -96,7 +96,7 @@ private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: R * Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific * remote address. */ -private[remote] class Association( +private[akka] class Association( val materializer: Materializer, val remoteAddress: Address, val transport: Transport) { @@ -113,7 +113,8 @@ private[remote] class Association( // Idempotent def associate(): Unit = { - queue = Source.queue(256, OverflowStrategy.dropBuffer).to(sink).run()(materializer) + if (queue eq null) + queue = Source.queue(256, OverflowStrategy.dropBuffer).to(sink).run()(materializer) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala index 88c55b3ff4..1b0302b126 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala @@ -24,12 +24,17 @@ import akka.event.LoggingAdapter import akka.event.Logging import io.aeron.driver.MediaDriver import io.aeron.Aeron +import org.agrona.ErrorHandler +import io.aeron.AvailableImageHandler +import io.aeron.Image +import io.aeron.UnavailableImageHandler +import io.aeron.exceptions.ConductorServiceTimeoutException /** * INTERNAL API */ // FIXME: Replace the codec with a custom made, hi-perf one -private[remote] class Transport( +private[akka] class Transport( val localAddress: Address, val system: ExtendedActorSystem, val materializer: Materializer, @@ -45,14 +50,47 @@ private[remote] class Transport( private val aeron = { val ctx = new Aeron.Context + + ctx.availableImageHandler(new AvailableImageHandler { + override def onAvailableImage(img: Image): Unit = { + if (log.isDebugEnabled) + log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}") + } + }) + ctx.unavailableImageHandler(new UnavailableImageHandler { + override def onUnavailableImage(img: Image): Unit = { + if (log.isDebugEnabled) + log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}") + // FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable + } + }) + ctx.errorHandler(new ErrorHandler { + override def onError(cause: Throwable): Unit = { + cause match { + case e: ConductorServiceTimeoutException ⇒ + // Timeout between service calls + log.error(cause, s"Aeron ServiceTimeoutException, ${cause.getMessage}") + + case _ ⇒ + log.error(cause, s"Aeron error, ${cause.getMessage}") + } + } + }) // TODO also support external media driver - val driver = MediaDriver.launchEmbedded() + val driverContext = new MediaDriver.Context + // FIXME settings from config + driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.driverTimeoutMs(SECONDS.toNanos(10)) + val driver = MediaDriver.launchEmbedded(driverContext) + ctx.aeronDirectoryName(driver.aeronDirectoryName) Aeron.connect(ctx) } def start(): Unit = { - Source.fromGraph(new AeronSource(inboundChannel, () ⇒ aeron)) + Source.fromGraph(new AeronSource(inboundChannel, aeron)) + .async // FIXME use dedicated dispatcher for AeronSource .map(ByteString.apply) // TODO we should use ByteString all the way .via(inboundFlow) .runWith(Sink.ignore) @@ -71,7 +109,7 @@ private[remote] class Transport( Flow.fromGraph(killSwitch.flow[Send]) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel, () ⇒ aeron)) + .to(new AeronSink(outboundChannel, aeron)) } // TODO: Try out parallelized serialization (mapAsync) for performance From 263e3a3b9976c2220124800d20c25e769f41ff05 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Apr 2016 17:38:26 +0200 Subject: [PATCH 2/3] add Artery throughput and latency tests * as multi-node tests * adjust multi-node-testkit to use the hostname for Artery * update HdrHistogram version --- .../akka/remote/testkit/MultiNodeSpec.scala | 1 + .../artery/AeronStreamLatencySpec.scala | 212 ++++++++++++++ .../artery/AeronStreamMaxThroughputSpec.scala | 194 ++++++++++++ .../akka/remote/artery/LatencySpec.scala | 270 +++++++++++++++++ .../remote/artery/MaxThroughputSpec.scala | 277 ++++++++++++++++++ akka-remote/src/main/resources/reference.conf | 8 +- .../scala/akka/remote/RemoteSettings.scala | 7 +- .../scala/akka/remote/artery/AeronSink.scala | 2 - .../akka/remote/artery}/RateReporter.java | 2 +- .../akka/remote/artery/AeronStreamsApp.scala | 77 +++-- .../artery/RemoteSendConsistencySpec.scala | 16 +- .../akka/testkit/metrics/HdrHistogram.scala | 4 +- project/AkkaBuild.scala | 2 +- project/Dependencies.scala | 5 +- 14 files changed, 1033 insertions(+), 44 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala rename {akka-bench-jmh/src/main/java/akka/aeron => akka-remote/src/test/java/akka/remote/artery}/RateReporter.java (99%) rename akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala => akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala (75%) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index aae55a1db7..b8969bfc6a 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -196,6 +196,7 @@ object MultiNodeSpec { private[testkit] val nodeConfig = mapToConfig(Map( "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", + "akka.remote.artery.hostname" -> selfName, "akka.remote.netty.tcp.hostname" -> selfName, "akka.remote.netty.tcp.port" -> selfPort)) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala new file mode 100644 index 0000000000..c6cd4a7fe1 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -0,0 +1,212 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLongArray + +import scala.concurrent.duration._ + +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.stream.ActorMaterializer +import akka.stream.ThrottleMode +import akka.stream.scaladsl.Source +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import io.aeron.Aeron +import io.aeron.driver.MediaDriver +import org.HdrHistogram.Histogram + +object AeronStreamLatencySpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + val barrierTimeout = 5.minutes + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3) + akka.test.AeronStreamLatencySpec.totalMessagesFactor = 1.0 + akka.test.AeronStreamLatencySpec.repeatCount = 1 + akka { + loglevel = ERROR + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor { + provider = "akka.remote.RemoteActorRefProvider" + serialize-creators = false + serialize-messages = false + } + remote.artery.enabled = off + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20521 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20522 + } + + final case class TestSettings( + testName: String, + messageRate: Int, // msg/s + payloadSize: Int, + repeat: Int) + +} + +class AeronStreamLatencySpecMultiJvmNode1 extends AeronStreamLatencySpec +class AeronStreamLatencySpecMultiJvmNode2 extends AeronStreamLatencySpec + +abstract class AeronStreamLatencySpec + extends MultiNodeSpec(AeronStreamLatencySpec) + with STMultiNodeSpec with ImplicitSender { + + import AeronStreamLatencySpec._ + + val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamLatencySpec.totalMessagesFactor") + val repeatCount = system.settings.config.getInt("akka.test.AeronStreamLatencySpec.repeatCount") + + val aeron = { + val ctx = new Aeron.Context + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + lazy implicit val mat = ActorMaterializer()(system) + import system.dispatcher + + override def initialParticipants = roles.size + + def channel(roleName: RoleName) = { + val a = node(roleName).address + s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}" + } + + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + def reporter(name: String): RateReporter = { + val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + reporterExecutor.execute(r) + r + } + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + super.afterAll() + } + + def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = { + import scala.collection.JavaConverters._ + val percentiles = histogram.percentiles(5) + def percentile(p: Double): Double = + percentiles.iterator().asScala.collectFirst { + case value if (p - 0.5) < value.getPercentileLevelIteratedTo && + value.getPercentileLevelIteratedTo < (p + 0.5) ⇒ value.getValueIteratedTo / 1000.0 + }.getOrElse(Double.NaN) + + println(s"=== AeronStreamLatency $testName: RTT " + + f"50%%ile: ${percentile(50.0)}%.0f µs, " + + f"90%%ile: ${percentile(90.0)}%.0f µs, " + + f"99%%ile: ${percentile(99.0)}%.0f µs, ") + println("Histogram of RTT latencies in microseconds.") + histogram.outputPercentileDistribution(System.out, 1000.0) + } + + val scenarios = List( + TestSettings( + testName = "rate-100-size-100", + messageRate = 100, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-1000-size-100", + messageRate = 1000, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-10000-size-100", + messageRate = 10000, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-1000-size-1k", + messageRate = 1000, + payloadSize = 1000, + repeat = repeatCount)) + + def test(testSettings: TestSettings): Unit = { + import testSettings._ + + runOn(first) { + val payload = ("0" * payloadSize).getBytes("utf-8") + // by default run for 2 seconds, but can be adjusted with the totalMessagesFactor + val totalMessages = (2 * messageRate * totalMessagesFactor).toInt + val sendTimes = new AtomicLongArray(totalMessages) + val histogram = new Histogram(SECONDS.toNanos(10), 3) + + val rep = reporter(testName) + val barrier = new CyclicBarrier(2) + val count = new AtomicInteger + Source.fromGraph(new AeronSource(channel(first), aeron)) + .runForeach { bytes ⇒ + if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") + rep.onMessage(1, payloadSize) + val c = count.incrementAndGet() + val d = System.nanoTime() - sendTimes.get(c - 1) + histogram.recordValue(d) + if (c == totalMessages) { + printTotal(testName, bytes.length, histogram) + barrier.await() // this is always the last party + } + } + + for (n ← 1 to repeat) { + histogram.reset() + count.set(0) + + Source(1 to totalMessages) + .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) + .map { n ⇒ + sendTimes.set(n - 1, System.nanoTime()) + payload + } + .runWith(new AeronSink(channel(second), aeron)) + + barrier.await((totalMessages / messageRate) + 10, SECONDS) + } + + rep.halt() + } + + enterBarrier("after-" + testName) + } + + "Latency of Aeron Streams" must { + + "start echo" in { + runOn(second) { + // just echo back + Source.fromGraph(new AeronSource(channel(second), aeron)) + .runWith(new AeronSink(channel(first), aeron)) + } + enterBarrier("echo-started") + } + + for (s ← scenarios) { + s"be low for ${s.testName}, at ${s.messageRate} msg/s, payloadSize = ${s.payloadSize}" in test(s) + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala new file mode 100644 index 0000000000..22b42bbbd0 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -0,0 +1,194 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.net.InetAddress +import java.util.concurrent.Executors + +import scala.collection.AbstractIterator +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Source +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +import io.aeron.Aeron +import io.aeron.driver.MediaDriver + +object AeronStreamMaxThroughputSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + val barrierTimeout = 5.minutes + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (20) + akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor = 1.0 + akka { + loglevel = ERROR + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor { + provider = "akka.remote.RemoteActorRefProvider" + serialize-creators = false + serialize-messages = false + } + remote.artery.enabled = off + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20511 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20512 + } + + final case class TestSettings( + testName: String, + totalMessages: Long, + payloadSize: Int) + + def iterate(start: Long, end: Long): Iterator[Long] = new AbstractIterator[Long] { + private[this] var first = true + private[this] var acc = start + def hasNext: Boolean = acc < end + def next(): Long = { + if (!hasNext) throw new NoSuchElementException("next on empty iterator") + if (first) first = false + else acc += 1 + + acc + } + } +} + +class AeronStreamMaxThroughputSpecMultiJvmNode1 extends AeronStreamMaxThroughputSpec +class AeronStreamMaxThroughputSpecMultiJvmNode2 extends AeronStreamMaxThroughputSpec + +abstract class AeronStreamMaxThroughputSpec + extends MultiNodeSpec(AeronStreamMaxThroughputSpec) + with STMultiNodeSpec with ImplicitSender { + + import AeronStreamMaxThroughputSpec._ + + val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor") + + val aeron = { + val ctx = new Aeron.Context + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + lazy implicit val mat = ActorMaterializer()(system) + import system.dispatcher + + def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong + + override def initialParticipants = roles.size + + def channel(roleName: RoleName) = { + val a = node(roleName).address + s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}" + } + + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + def reporter(name: String): RateReporter = { + val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + reporterExecutor.execute(r) + r + } + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + super.afterAll() + } + + def printTotal(testName: String, total: Long, startTime: Long, payloadSize: Long): Unit = { + val d = (System.nanoTime - startTime).nanos.toMillis + println(f"=== AeronStreamMaxThroughput $testName: " + + f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s, " + + s"payload size $payloadSize, " + + s"$d ms to deliver $total messages") + } + + val scenarios = List( + TestSettings( + testName = "size-100", + totalMessages = adjustedTotalMessages(1000000), + payloadSize = 100), + TestSettings( + testName = "size-1k", + totalMessages = adjustedTotalMessages(100000), + payloadSize = 1000), + TestSettings( + testName = "size-10k", + totalMessages = adjustedTotalMessages(10000), + payloadSize = 10000)) + + def test(testSettings: TestSettings): Unit = { + import testSettings._ + val receiverName = testName + "-rcv" + + runOn(second) { + val rep = reporter(testName) + var t0 = System.nanoTime() + var count = 0L + val done = TestLatch(1) + Source.fromGraph(new AeronSource(channel(second), aeron)) + .runForeach { bytes ⇒ + rep.onMessage(1, bytes.length) + count += 1 + if (count == 1) { + t0 = System.nanoTime() + } else if (count == totalMessages) { + printTotal(testName, totalMessages, t0, payloadSize) + done.countDown() + } + }.onFailure { + case e ⇒ + e.printStackTrace + } + + enterBarrier(receiverName + "-started") + Await.ready(done, barrierTimeout) + rep.halt() + enterBarrier(testName + "-done") + } + + runOn(first) { + enterBarrier(receiverName + "-started") + + val payload = ("0" * payloadSize).getBytes("utf-8") + val t0 = System.nanoTime() + Source.fromIterator(() ⇒ iterate(1, totalMessages)) + .map { n ⇒ payload } + .runWith(new AeronSink(channel(second), aeron)) + + enterBarrier(testName + "-done") + } + + enterBarrier("after-" + testName) + } + + "Max throughput of Aeron Streams" must { + + for (s ← scenarios) { + s"be great for ${s.testName}, payloadSize = ${s.payloadSize}" in test(s) + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala new file mode 100644 index 0000000000..3cc9fca890 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -0,0 +1,270 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.net.InetAddress +import java.util.concurrent.Executors +import scala.collection.AbstractIterator +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Source +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import io.aeron.Aeron +import io.aeron.driver.MediaDriver +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLongArray +import org.HdrHistogram.Histogram +import akka.stream.ThrottleMode +import java.io.StringWriter +import java.io.PrintStream +import java.io.OutputStreamWriter +import java.io.BufferedOutputStream +import java.io.ByteArrayOutputStream + +object LatencySpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + val barrierTimeout = 5.minutes + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3) + akka.test.LatencySpec.totalMessagesFactor = 1.0 + akka.test.LatencySpec.repeatCount = 1 + akka { + loglevel = ERROR + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor { + provider = "akka.remote.RemoteActorRefProvider" + serialize-creators = false + serialize-messages = false + } + remote.artery { + enabled = on + } + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20501 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20502 + } + + nodeConfig(first) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(first)} + """) + } + + nodeConfig(second) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(second)} + """) + } + + final case object Reset + + def echoProps(): Props = + Props(new Echo) + + class Echo extends Actor { + // FIXME to avoid using new RemoteActorRef each time + var cachedSender: ActorRef = null + + def receive = { + case Reset ⇒ + cachedSender = null + sender() ! Reset + case msg ⇒ + if (cachedSender == null) cachedSender = sender() + cachedSender ! msg + } + } + + def receiverProps(reporter: RateReporter, settings: TestSettings, totalMessages: Int, + sendTimes: AtomicLongArray, histogram: Histogram): Props = + Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram)) + + class Receiver(reporter: RateReporter, settings: TestSettings, totalMessages: Int, + sendTimes: AtomicLongArray, histogram: Histogram) extends Actor { + import settings._ + + var count = 0 + + def receive = { + case bytes: Array[Byte] ⇒ + if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") + reporter.onMessage(1, payloadSize) + count += 1 + val d = System.nanoTime() - sendTimes.get(count - 1) + histogram.recordValue(d) + if (count == totalMessages) { + printTotal(testName, bytes.length, histogram) + context.stop(self) + } + } + + def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = { + import scala.collection.JavaConverters._ + val percentiles = histogram.percentiles(5) + def percentile(p: Double): Double = + percentiles.iterator().asScala.collectFirst { + case value if (p - 0.5) < value.getPercentileLevelIteratedTo && + value.getPercentileLevelIteratedTo < (p + 0.5) ⇒ value.getValueIteratedTo / 1000.0 + }.getOrElse(Double.NaN) + + println(s"=== Latency $testName: RTT " + + f"50%%ile: ${percentile(50.0)}%.0f µs, " + + f"90%%ile: ${percentile(90.0)}%.0f µs, " + + f"99%%ile: ${percentile(99.0)}%.0f µs, ") + println("Histogram of RTT latencies in microseconds.") + histogram.outputPercentileDistribution(System.out, 1000.0) + } + } + + final case class TestSettings( + testName: String, + messageRate: Int, // msg/s + payloadSize: Int, + repeat: Int) + +} + +class LatencySpecMultiJvmNode1 extends LatencySpec +class LatencySpecMultiJvmNode2 extends LatencySpec + +abstract class LatencySpec + extends MultiNodeSpec(LatencySpec) + with STMultiNodeSpec with ImplicitSender { + + import LatencySpec._ + + val totalMessagesFactor = system.settings.config.getDouble("akka.test.LatencySpec.totalMessagesFactor") + val repeatCount = system.settings.config.getInt("akka.test.LatencySpec.repeatCount") + + val aeron = { + val ctx = new Aeron.Context + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + lazy implicit val mat = ActorMaterializer()(system) + import system.dispatcher + + override def initialParticipants = roles.size + + def channel(roleName: RoleName) = { + val a = node(roleName).address + s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}" + } + + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + def reporter(name: String): RateReporter = { + val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + reporterExecutor.execute(r) + r + } + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + super.afterAll() + } + + def identifyEcho(name: String = "echo", r: RoleName = second): ActorRef = { + system.actorSelection(node(r) / "user" / name) ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + val scenarios = List( + TestSettings( + testName = "rate-100-size-100", + messageRate = 100, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-1000-size-100", + messageRate = 1000, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-10000-size-100", + messageRate = 10000, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-1000-size-1k", + messageRate = 1000, + payloadSize = 1000, + repeat = repeatCount)) + + def test(testSettings: TestSettings): Unit = { + import testSettings._ + + runOn(first) { + val payload = ("0" * payloadSize).getBytes("utf-8") + // by default run for 2 seconds, but can be adjusted with the totalMessagesFactor + val totalMessages = (2 * messageRate * totalMessagesFactor).toInt + val sendTimes = new AtomicLongArray(totalMessages) + val histogram = new Histogram(SECONDS.toNanos(10), 3) + val rep = reporter(testName) + + val echo = identifyEcho() + + for (n ← 1 to repeat) { + echo ! Reset + expectMsg(Reset) + histogram.reset() + val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram)) + + Source(1 to totalMessages) + .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) + .runForeach { n ⇒ + sendTimes.set(n - 1, System.nanoTime()) + echo.tell(payload, receiver) + } + + watch(receiver) + expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds) + } + + rep.halt() + } + + enterBarrier("after-" + testName) + } + + "Latency of Artery" must { + + "start echo" in { + runOn(second) { + // just echo back + system.actorOf(echoProps, "echo") + } + enterBarrier("echo-started") + } + + for (s ← scenarios) { + s"be low for ${s.testName}, at ${s.messageRate} msg/s, payloadSize = ${s.payloadSize}" in test(s) + } + + // TODO add more tests + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala new file mode 100644 index 0000000000..e56a6f1ec2 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -0,0 +1,277 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit.NANOSECONDS +import scala.concurrent.duration._ +import akka.actor._ +import akka.remote.RemoteActorRefProvider +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import java.net.InetAddress + +object MaxThroughputSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + val barrierTimeout = 5.minutes + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (20) + akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0 + akka { + loglevel = ERROR + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor { + provider = "akka.remote.RemoteActorRefProvider" + serialize-creators = false + serialize-messages = false + } + remote.artery { + enabled = on + } + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20501 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20502 + } + + nodeConfig(first) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(first)} + """) + } + + nodeConfig(second) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(second)} + """) + } + + case object Run + sealed trait Echo extends DeadLetterSuppression + final case object Start extends Echo + final case object End extends Echo + final case class EndResult(totalReceived: Long) + final case class FlowControl(burstStartTime: Long) extends Echo + + def receiverProps(reporter: RateReporter, payloadSize: Int): Props = + Props(new Receiver(reporter, payloadSize)) + + class Receiver(reporter: RateReporter, payloadSize: Int) extends Actor { + var c = 0L + + def receive = { + case Start ⇒ + c = 0 + sender() ! Start + case End ⇒ + sender() ! EndResult(c) + context.stop(self) + case m: Echo ⇒ + sender() ! m + case msg: Array[Byte] ⇒ + if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message") + reporter.onMessage(1, payloadSize) + c += 1 + } + } + + def senderProps(target: ActorRef, testSettings: TestSettings): Props = + Props(new Sender(target, testSettings)) + + class Sender(target: ActorRef, testSettings: TestSettings) extends Actor { + import testSettings._ + val payload = ("0" * testSettings.payloadSize).getBytes("utf-8") + var startTime = 0L + var remaining = totalMessages + var maxRoundTripMillis = 0L + + def receive = { + case Run ⇒ + // first some warmup + sendBatch() + // then Start, which will echo back here + target ! Start + + case Start ⇒ + println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " + + s"$burstSize and payload size $payloadSize") + startTime = System.nanoTime + remaining = totalMessages + // have a few batches in flight to make sure there are always messages to send + (1 to 3).foreach { _ ⇒ + val t0 = System.nanoTime() + sendBatch() + sendFlowControl(t0) + } + + case c @ FlowControl(t0) ⇒ + val now = System.nanoTime() + val duration = NANOSECONDS.toMillis(now - t0) + maxRoundTripMillis = math.max(maxRoundTripMillis, duration) + + sendBatch() + sendFlowControl(now) + + case EndResult(totalReceived) ⇒ + val took = NANOSECONDS.toMillis(System.nanoTime - startTime) + val throughtput = (totalReceived * 1000.0 / took).toInt + println( + s"=== MaxThroughput ${self.path.name}: " + + s"throughtput $throughtput msg/s, " + + s"dropped ${totalMessages - totalReceived}, " + + s"max round-trip $maxRoundTripMillis ms, " + + s"burst size $burstSize, " + + s"payload size $payloadSize, " + + s"$took ms to deliver $totalReceived messages") + context.stop(self) + } + + def sendBatch(): Unit = { + val batchSize = math.min(remaining, burstSize) + var i = 0 + while (i < batchSize) { + target ! payload + i += 1 + } + remaining -= batchSize + } + + def sendFlowControl(t0: Long): Unit = { + if (remaining <= 0) + target ! End + else + target ! FlowControl(t0) + } + } + + final case class TestSettings( + testName: String, + totalMessages: Long, + burstSize: Int, + payloadSize: Int, + senderReceiverPairs: Int) + +} + +class MaxThroughputSpecMultiJvmNode1 extends MaxThroughputSpec +class MaxThroughputSpecMultiJvmNode2 extends MaxThroughputSpec + +abstract class MaxThroughputSpec + extends MultiNodeSpec(MaxThroughputSpec) + with STMultiNodeSpec with ImplicitSender { + + import MaxThroughputSpec._ + + val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor") + + def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong + + override def initialParticipants = roles.size + + def remoteSettings = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].remoteSettings + + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + def reporter(name: String): RateReporter = { + val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + reporterExecutor.execute(r) + r + } + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + super.afterAll() + } + + def identifyReceiver(name: String, r: RoleName = second): ActorRef = { + system.actorSelection(node(r) / "user" / name) ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + val scenarios = List( + TestSettings( + testName = "1-to-1", + totalMessages = adjustedTotalMessages(20000), + burstSize = 1000, + payloadSize = 100, + senderReceiverPairs = 1), + TestSettings( + testName = "1-to-1-size-1k", + totalMessages = adjustedTotalMessages(20000), + burstSize = 1000, + payloadSize = 1000, + senderReceiverPairs = 1), + TestSettings( + testName = "1-to-1-size-10k", + totalMessages = adjustedTotalMessages(10000), + burstSize = 1000, + payloadSize = 10000, + senderReceiverPairs = 1), + TestSettings( + testName = "5-to-5", + totalMessages = adjustedTotalMessages(20000), + burstSize = 1000, + payloadSize = 100, + senderReceiverPairs = 5)) + + def test(testSettings: TestSettings): Unit = { + import testSettings._ + val receiverName = testName + "-rcv" + + runOn(second) { + val rep = reporter(testName) + for (n ← 1 to senderReceiverPairs) { + val receiver = system.actorOf(receiverProps(rep, payloadSize), receiverName + n) + } + enterBarrier(receiverName + "-started") + enterBarrier(testName + "-done") + rep.halt() + } + + runOn(first) { + enterBarrier(receiverName + "-started") + val senders = for (n ← 1 to senderReceiverPairs) yield { + val receiver = identifyReceiver(receiverName + n) + val snd = system.actorOf(senderProps(receiver, testSettings), testName + "-snd" + n) + val p = TestProbe() + p.watch(snd) + snd ! Run + (snd, p) + } + senders.foreach { + case (snd, p) ⇒ + val t = if (snd == senders.head._1) barrierTimeout else 10.seconds + p.expectTerminated(snd, t) + } + enterBarrier(testName + "-done") + } + + enterBarrier("after-" + testName) + } + + "Max throughput of Artery" must { + + for (s ← scenarios) { + s"be great for ${s.testName}, burstSize = ${s.burstSize}, payloadSize = ${s.payloadSize}" in test(s) + } + + // TODO add more tests, such as 5-to-5 sender receiver pairs + + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index e2aa5d8538..94e3de17fe 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -75,7 +75,13 @@ akka { artery { enabled = off port = 20200 - hostname = localhost + + # The hostname or ip clients should connect to. + # InetAddress.getLocalHost.getHostAddress is used if empty or + # "" is specified. + # InetAddress.getLocalHost.getHostName is used if + # "" is specified. + hostname = "" } ### General settings diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 97806381ff..6493489ba1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -15,6 +15,7 @@ import akka.event.Logging import akka.event.Logging.LogLevel import akka.ConfigurationException import java.util.Locale +import java.net.InetAddress final class RemoteSettings(val config: Config) { import config._ @@ -22,7 +23,11 @@ final class RemoteSettings(val config: Config) { val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled") val ArteryPort: Int = getInt("akka.remote.artery.port") - val ArteryHostname: String = getString("akka.remote.artery.hostname") + val ArteryHostname: String = getString("akka.remote.artery.hostname") match { + case "" | "" ⇒ InetAddress.getLocalHost.getHostAddress + case "" ⇒ InetAddress.getLocalHost.getHostName + case other ⇒ other + } val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index 06296c6e78..d4261ea99d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -73,8 +73,6 @@ class AeronSink(channel: String, aeron: Aeron) extends GraphStage[SinkShape[Aero // retrying/polling to a separate thread that performs the polling for // all sources/sinks and notifies back when there is some news. backoffCount -= 1 - if (backoffCount <= 10) - println(s"# snd backoff $backoffCount") // FIXME if (backoffCount <= 10) scheduleOnce(Backoff, 50.millis) else diff --git a/akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java b/akka-remote/src/test/java/akka/remote/artery/RateReporter.java similarity index 99% rename from akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java rename to akka-remote/src/test/java/akka/remote/artery/RateReporter.java index e042e58bab..0e455fc0ab 100644 --- a/akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java +++ b/akka-remote/src/test/java/akka/remote/artery/RateReporter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package akka.aeron; +package akka.remote.artery; import java.util.concurrent.locks.LockSupport; diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala similarity index 75% rename from akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala rename to akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala index 9857db6e69..ce25f329a0 100644 --- a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala @@ -1,4 +1,4 @@ -package akka.aeron +package akka.remote.artery import scala.concurrent.duration._ import akka.actor.ActorSystem @@ -17,10 +17,13 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.atomic.AtomicLongArray import akka.stream.ThrottleMode -import akka.remote.artery.AeronSink -import akka.remote.artery.AeronSource +import org.agrona.ErrorHandler +import io.aeron.AvailableImageHandler +import io.aeron.UnavailableImageHandler +import io.aeron.Image +import io.aeron.AvailableImageHandler -object AeronStreams { +object AeronStreamsApp { val channel1 = "aeron:udp?endpoint=localhost:40123" val channel2 = "aeron:udp?endpoint=localhost:40124" @@ -32,7 +35,27 @@ object AeronStreams { lazy val aeron = { val ctx = new Aeron.Context - val driver = MediaDriver.launchEmbedded() + ctx.errorHandler(new ErrorHandler { + override def onError(cause: Throwable) { + println(s"# Aeron onError " + cause) // FIXME + } + }) + ctx.availableImageHandler(new AvailableImageHandler { + override def onAvailableImage(img: Image): Unit = { + println(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}") + } + }) + ctx.unavailableImageHandler(new UnavailableImageHandler { + override def onUnavailableImage(img: Image): Unit = { + println(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}") + } + }) + + val driverContext = new MediaDriver.Context + driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.driverTimeoutMs(SECONDS.toNanos(10)) + val driver = MediaDriver.launchEmbedded(driverContext) ctx.aeronDirectoryName(driver.aeronDirectoryName) Aeron.connect(ctx) } @@ -111,12 +134,12 @@ object AeronStreams { var t0 = System.nanoTime() var count = 0L var payloadSize = 0L - Source.fromGraph(new AeronSource(channel1, () => aeron)) - .map { bytes => + Source.fromGraph(new AeronSource(channel1, aeron)) + .map { bytes ⇒ r.onMessage(1, bytes.length) bytes } - .runForeach { bytes => + .runForeach { bytes ⇒ count += 1 if (count == 1) { t0 = System.nanoTime() @@ -126,7 +149,7 @@ object AeronStreams { printTotal(throughputN, "receive", t0, payloadSize) } }.onFailure { - case e => + case e ⇒ e.printStackTrace exit(-1) } @@ -138,30 +161,30 @@ object AeronStreams { val r = reporter val t0 = System.nanoTime() Source(1 to throughputN) - .map { n => + .map { n ⇒ if (n == throughputN) { exit(0) printTotal(throughputN, "send", t0, payload.length) } n } - .map { _ => + .map { _ ⇒ r.onMessage(1, payload.length) payload } - .runWith(new AeronSink(channel1, () => aeron)) + .runWith(new AeronSink(channel1, aeron)) } def runEchoReceiver(): Unit = { // just echo back on channel2 reporterExecutor.execute(reporter) val r = reporter - Source.fromGraph(new AeronSource(channel1, () => aeron)) - .map { bytes => + Source.fromGraph(new AeronSource(channel1, aeron)) + .map { bytes ⇒ r.onMessage(1, bytes.length) bytes } - .runWith(new AeronSink(channel2, () => aeron)) + .runWith(new AeronSink(channel2, aeron)) } def runEchoSender(): Unit = { @@ -173,12 +196,12 @@ object AeronStreams { var repeat = 3 val count = new AtomicInteger var t0 = System.nanoTime() - Source.fromGraph(new AeronSource(channel2, () => aeron)) - .map { bytes => + Source.fromGraph(new AeronSource(channel2, aeron)) + .map { bytes ⇒ r.onMessage(1, bytes.length) bytes } - .runForeach { bytes => + .runForeach { bytes ⇒ val c = count.incrementAndGet() val d = System.nanoTime() - sendTimes.get(c - 1) if (c % (latencyN / 10) == 0) @@ -189,7 +212,7 @@ object AeronStreams { barrier.await() // this is always the last party } }.onFailure { - case e => + case e ⇒ e.printStackTrace exit(-1) } @@ -202,13 +225,13 @@ object AeronStreams { Source(1 to latencyN) .throttle(latencyRate, 1.second, latencyRate / 10, ThrottleMode.Shaping) - .map { n => + .map { n ⇒ if (n % (latencyN / 10) == 0) println(s"# send offset $n") // FIXME sendTimes.set(n - 1, System.nanoTime()) payload } - .runWith(new AeronSink(channel1, () => aeron)) + .runWith(new AeronSink(channel1, aeron)) barrier.await() } @@ -218,12 +241,12 @@ object AeronStreams { def runDebugReceiver(): Unit = { import system.dispatcher - Source.fromGraph(new AeronSource(channel1, () => aeron)) - .map(bytes => new String(bytes, "utf-8")) - .runForeach { s => + Source.fromGraph(new AeronSource(channel1, aeron)) + .map(bytes ⇒ new String(bytes, "utf-8")) + .runForeach { s ⇒ println(s) }.onFailure { - case e => + case e ⇒ e.printStackTrace exit(-1) } @@ -233,12 +256,12 @@ object AeronStreams { val fill = "0000" Source(1 to 1000) .throttle(1, 1.second, 1, ThrottleMode.Shaping) - .map { n => + .map { n ⇒ val s = (fill + n.toString).takeRight(4) println(s) s.getBytes("utf-8") } - .runWith(new AeronSink(channel1, () => aeron)) + .runWith(new AeronSink(channel1, aeron)) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 34013720c6..b6c320206c 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -1,5 +1,6 @@ package akka.remote.artery +import scala.concurrent.duration._ import akka.actor.{ Actor, ActorIdentity, ActorSystem, Deploy, ExtendedActorSystem, Identify, Props, RootActorPath } import akka.testkit.{ AkkaSpec, ImplicitSender } import com.typesafe.config.ConfigFactory @@ -12,6 +13,7 @@ object RemoteSendConsistencySpec { akka { actor.provider = "akka.remote.RemoteActorRefProvider" remote.artery.enabled = on + remote.artery.hostname = localhost } """ @@ -63,8 +65,8 @@ class RemoteSendConsistencySpec extends AkkaSpec(commonConfig) with ImplicitSend } val senderProps = Props(new Actor { - var counter = 1000 - remoteRef ! 1000 + var counter = 100 // FIXME try this test with 1000, why does it take so long? + remoteRef ! counter override def receive: Receive = { case i: Int ⇒ @@ -84,10 +86,12 @@ class RemoteSendConsistencySpec extends AkkaSpec(commonConfig) with ImplicitSend system.actorOf(senderProps) system.actorOf(senderProps) - expectMsg("success") - expectMsg("success") - expectMsg("success") - expectMsg("success") + within(10.seconds) { + expectMsg("success") + expectMsg("success") + expectMsg("success") + expectMsg("success") + } } } diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala index 8c7e983d97..943f020050 100644 --- a/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala @@ -41,8 +41,8 @@ private[akka] class HdrHistogram( private def wrapHistogramOutOfBoundsException(value: Long, ex: ArrayIndexOutOfBoundsException): IllegalArgumentException = new IllegalArgumentException(s"Given value $value can not be stored in this histogram " + - s"(min: ${hist.getLowestTrackableValue}, max: ${hist.getHighestTrackableValue}})", ex) + s"(min: ${hist.getLowestDiscernibleValue}, max: ${hist.getHighestTrackableValue}})", ex) - def getData = hist.copy().getHistogramData + def getData = hist.copy() } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 1530ee7d50..2973c04915 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -132,7 +132,7 @@ object AkkaBuild extends Build { lazy val remoteTests = Project( id = "akka-remote-tests", base = file("akka-remote-tests"), - dependencies = Seq(actorTests % "test->test", multiNodeTestkit) + dependencies = Seq(actorTests % "test->test", remote % "test->test", multiNodeTestkit) ) configs (MultiJvm) lazy val cluster = Project( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b55dc7d954..7e09655211 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -69,7 +69,6 @@ object Dependencies { val aeronDriver = "io.aeron" % "aeron-driver" % "0.9.5" // ApacheV2 val aeronClient = "io.aeron" % "aeron-client" % "0.9.5" // ApacheV2 - val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" // CC0 object Docs { val sprayJson = "io.spray" %% "spray-json" % "1.3.2" % "test" @@ -96,7 +95,7 @@ object Dependencies { val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2 val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.2" % "test" // ApacheV2 val latencyUtils = "org.latencyutils" % "LatencyUtils" % "1.0.3" % "test" // Free BSD - val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.1.4" % "test" // CC0 + val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" % "test" // CC0 val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram) // sigar logging @@ -165,7 +164,7 @@ object Dependencies { val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) - val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, hdrHistogram) + val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative) // akka stream & http From c01b3e8fedf9efea25abff9e92c0b90382d954e3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 22 Apr 2016 16:24:12 +0200 Subject: [PATCH 3/3] add output for trend plots * use Jenkins Plot plugin * output csv values to log and then use post build step to turn those into csv files that the plot plugin understands --- .../artery/AeronStreamLatencySpec.scala | 24 ++++++++++-- .../artery/AeronStreamMaxThroughputSpec.scala | 10 ++++- .../akka/remote/artery/LatencySpec.scala | 30 +++++++++++++-- .../remote/artery/MaxThroughputSpec.scala | 38 +++++++++++++------ .../scala/akka/remote/artery/PlotResult.scala | 23 +++++++++++ 5 files changed, 104 insertions(+), 21 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index c6cd4a7fe1..33556e5ec2 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -7,9 +7,7 @@ import java.util.concurrent.CyclicBarrier import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLongArray - import scala.concurrent.duration._ - import akka.actor._ import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig @@ -23,6 +21,7 @@ import com.typesafe.config.ConfigFactory import io.aeron.Aeron import io.aeron.driver.MediaDriver import org.HdrHistogram.Histogram +import java.util.concurrent.atomic.AtomicBoolean object AeronStreamLatencySpec extends MultiNodeConfig { val first = role("first") @@ -73,6 +72,8 @@ abstract class AeronStreamLatencySpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamLatencySpec.totalMessagesFactor") val repeatCount = system.settings.config.getInt("akka.test.AeronStreamLatencySpec.repeatCount") + var plots = LatencyPlots() + val aeron = { val ctx = new Aeron.Context val driver = MediaDriver.launchEmbedded() @@ -104,10 +105,15 @@ abstract class AeronStreamLatencySpec override def afterAll(): Unit = { reporterExecutor.shutdown() + runOn(first) { + println(plots.plot50.csv(system.name + "50")) + println(plots.plot90.csv(system.name + "90")) + println(plots.plot99.csv(system.name + "99")) + } super.afterAll() } - def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = { + def printTotal(testName: String, payloadSize: Long, histogram: Histogram, lastRepeat: Boolean): Unit = { import scala.collection.JavaConverters._ val percentiles = histogram.percentiles(5) def percentile(p: Double): Double = @@ -122,6 +128,14 @@ abstract class AeronStreamLatencySpec f"99%%ile: ${percentile(99.0)}%.0f µs, ") println("Histogram of RTT latencies in microseconds.") histogram.outputPercentileDistribution(System.out, 1000.0) + + // only use the last repeat for the plots + if (lastRepeat) { + plots = plots.copy( + plot50 = plots.plot50.add(testName, percentile(50.0)), + plot90 = plots.plot90.add(testName, percentile(90.0)), + plot99 = plots.plot99.add(testName, percentile(99.0))) + } } val scenarios = List( @@ -159,6 +173,7 @@ abstract class AeronStreamLatencySpec val rep = reporter(testName) val barrier = new CyclicBarrier(2) val count = new AtomicInteger + val lastRepeat = new AtomicBoolean(false) Source.fromGraph(new AeronSource(channel(first), aeron)) .runForeach { bytes ⇒ if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") @@ -167,7 +182,7 @@ abstract class AeronStreamLatencySpec val d = System.nanoTime() - sendTimes.get(c - 1) histogram.recordValue(d) if (c == totalMessages) { - printTotal(testName, bytes.length, histogram) + printTotal(testName, bytes.length, histogram, lastRepeat.get) barrier.await() // this is always the last party } } @@ -175,6 +190,7 @@ abstract class AeronStreamLatencySpec for (n ← 1 to repeat) { histogram.reset() count.set(0) + lastRepeat.set(n == repeat) Source(1 to totalMessages) .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 22b42bbbd0..1115a2a552 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -68,6 +68,7 @@ object AeronStreamMaxThroughputSpec extends MultiNodeConfig { acc } } + } class AeronStreamMaxThroughputSpecMultiJvmNode1 extends AeronStreamMaxThroughputSpec @@ -81,6 +82,8 @@ abstract class AeronStreamMaxThroughputSpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor") + var plot = PlotResult() + val aeron = { val ctx = new Aeron.Context val driver = MediaDriver.launchEmbedded() @@ -114,15 +117,20 @@ abstract class AeronStreamMaxThroughputSpec override def afterAll(): Unit = { reporterExecutor.shutdown() + runOn(second) { + println(plot.csv(system.name)) + } super.afterAll() } def printTotal(testName: String, total: Long, startTime: Long, payloadSize: Long): Unit = { val d = (System.nanoTime - startTime).nanos.toMillis + val throughput = 1000.0 * total / d println(f"=== AeronStreamMaxThroughput $testName: " + - f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s, " + + f"${throughput}%.03g msg/s, ${throughput * payloadSize}%.03g bytes/s, " + s"payload size $payloadSize, " + s"$d ms to deliver $total messages") + plot = plot.add(testName, throughput * payloadSize / 1024 / 1024) } val scenarios = List( diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index 3cc9fca890..322bc12735 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -93,11 +93,11 @@ object LatencySpec extends MultiNodeConfig { } def receiverProps(reporter: RateReporter, settings: TestSettings, totalMessages: Int, - sendTimes: AtomicLongArray, histogram: Histogram): Props = - Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram)) + sendTimes: AtomicLongArray, histogram: Histogram, plotsRef: ActorRef): Props = + Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram, plotsRef)) class Receiver(reporter: RateReporter, settings: TestSettings, totalMessages: Int, - sendTimes: AtomicLongArray, histogram: Histogram) extends Actor { + sendTimes: AtomicLongArray, histogram: Histogram, plotsRef: ActorRef) extends Actor { import settings._ var count = 0 @@ -130,6 +130,12 @@ object LatencySpec extends MultiNodeConfig { f"99%%ile: ${percentile(99.0)}%.0f µs, ") println("Histogram of RTT latencies in microseconds.") histogram.outputPercentileDistribution(System.out, 1000.0) + + val plots = LatencyPlots( + PlotResult().add(testName, percentile(50.0)), + PlotResult().add(testName, percentile(90.0)), + PlotResult().add(testName, percentile(99.0))) + plotsRef ! plots } } @@ -153,6 +159,8 @@ abstract class LatencySpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.LatencySpec.totalMessagesFactor") val repeatCount = system.settings.config.getInt("akka.test.LatencySpec.repeatCount") + var plots = LatencyPlots() + val aeron = { val ctx = new Aeron.Context val driver = MediaDriver.launchEmbedded() @@ -184,6 +192,11 @@ abstract class LatencySpec override def afterAll(): Unit = { reporterExecutor.shutdown() + runOn(first) { + println(plots.plot50.csv(system.name + "50")) + println(plots.plot90.csv(system.name + "90")) + println(plots.plot99.csv(system.name + "99")) + } super.afterAll() } @@ -226,12 +239,13 @@ abstract class LatencySpec val rep = reporter(testName) val echo = identifyEcho() + val plotProbe = TestProbe() for (n ← 1 to repeat) { echo ! Reset expectMsg(Reset) histogram.reset() - val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram)) + val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram, plotProbe.ref)) Source(1 to totalMessages) .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) @@ -242,6 +256,14 @@ abstract class LatencySpec watch(receiver) expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds) + val p = plotProbe.expectMsgType[LatencyPlots] + // only use the last repeat for the plots + if (n == repeat) { + plots = plots.copy( + plot50 = plots.plot50.addAll(p.plot50), + plot90 = plots.plot90.addAll(p.plot90), + plot99 = plots.plot99.addAll(p.plot99)) + } } rep.halt() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index e56a6f1ec2..2aedd70c94 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -87,10 +87,10 @@ object MaxThroughputSpec extends MultiNodeConfig { } } - def senderProps(target: ActorRef, testSettings: TestSettings): Props = - Props(new Sender(target, testSettings)) + def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef): Props = + Props(new Sender(target, testSettings, plotRef)) - class Sender(target: ActorRef, testSettings: TestSettings) extends Actor { + class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef) extends Actor { import testSettings._ val payload = ("0" * testSettings.payloadSize).getBytes("utf-8") var startTime = 0L @@ -126,15 +126,17 @@ object MaxThroughputSpec extends MultiNodeConfig { case EndResult(totalReceived) ⇒ val took = NANOSECONDS.toMillis(System.nanoTime - startTime) - val throughtput = (totalReceived * 1000.0 / took).toInt + val throughput = (totalReceived * 1000.0 / took) println( s"=== MaxThroughput ${self.path.name}: " + - s"throughtput $throughtput msg/s, " + + f"throughput ${throughput}%.03g msg/s, " + + f"${throughput * payloadSize}%.03g bytes/s, " + s"dropped ${totalMessages - totalReceived}, " + s"max round-trip $maxRoundTripMillis ms, " + s"burst size $burstSize, " + s"payload size $payloadSize, " + s"$took ms to deliver $totalReceived messages") + plotRef ! PlotResult().add(testName, throughput * payloadSize / 1024 / 1024) context.stop(self) } @@ -176,6 +178,8 @@ abstract class MaxThroughputSpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor") + var plot = PlotResult() + def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong override def initialParticipants = roles.size @@ -196,6 +200,9 @@ abstract class MaxThroughputSpec override def afterAll(): Unit = { reporterExecutor.shutdown() + runOn(first) { + println(plot.csv(system.name)) + } super.afterAll() } @@ -246,18 +253,25 @@ abstract class MaxThroughputSpec runOn(first) { enterBarrier(receiverName + "-started") + val ignore = TestProbe() val senders = for (n ← 1 to senderReceiverPairs) yield { val receiver = identifyReceiver(receiverName + n) - val snd = system.actorOf(senderProps(receiver, testSettings), testName + "-snd" + n) - val p = TestProbe() - p.watch(snd) + val plotProbe = TestProbe() + val snd = system.actorOf(senderProps(receiver, testSettings, plotProbe.ref), + testName + "-snd" + n) + val terminationProbe = TestProbe() + terminationProbe.watch(snd) snd ! Run - (snd, p) + (snd, terminationProbe, plotProbe) } senders.foreach { - case (snd, p) ⇒ - val t = if (snd == senders.head._1) barrierTimeout else 10.seconds - p.expectTerminated(snd, t) + case (snd, terminationProbe, plotProbe) ⇒ + if (snd == senders.head._1) { + terminationProbe.expectTerminated(snd, barrierTimeout) + val plotResult = plotProbe.expectMsgType[PlotResult] + plot = plot.addAll(plotResult) + } else + terminationProbe.expectTerminated(snd, 10.seconds) } enterBarrier(testName + "-done") } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala new file mode 100644 index 0000000000..08858e62f1 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PlotResult.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +final case class PlotResult(values: Vector[(String, Number)] = Vector.empty) { + + def add(key: String, value: Number): PlotResult = + copy(values = values :+ (key -> value)) + + def addAll(p: PlotResult): PlotResult = + copy(values ++ p.values) + + def csvLabels: String = values.map(_._1).mkString("\"", "\",\"", "\"") + + def csvValues: String = values.map(_._2).mkString("\"", "\",\"", "\"") + + // this can be split to two lines with bash: cut -d':' -f2,3 | tr ':' $'\n' + def csv(name: String): String = s"PLOT_${name}:${csvLabels}:${csvValues}" + +} + +final case class LatencyPlots(plot50: PlotResult = PlotResult(), plot90: PlotResult = PlotResult(), plot99: PlotResult = PlotResult())