pekko/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala
Johan Andrén d6c048f59a A simpler ActorRefProvider config #20649 (#20767)
* Provide shorter aliases for the ActorRefProviders #20649
* Use the new actorefprovider aliases throughout code and docs
* Cleaner alias replacement logic
2016-06-10 15:04:13 +02:00

278 lines
8.8 KiB
Scala

/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLongArray
import scala.concurrent.duration._
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Source
import akka.testkit._
import com.typesafe.config.ConfigFactory
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import org.HdrHistogram.Histogram
import java.util.concurrent.atomic.AtomicBoolean
import akka.stream.KillSwitches
import akka.Done
import org.agrona.IoUtil
import java.io.File
import java.io.File
import akka.util.ByteString
import io.aeron.CncFileDescriptor
object AeronStreamLatencySpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val barrierTimeout = 5.minutes
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
# for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3)
akka.test.AeronStreamLatencySpec.totalMessagesFactor = 1.0
akka.test.AeronStreamLatencySpec.repeatCount = 1
akka {
loglevel = ERROR
testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s
actor {
provider = remote
serialize-creators = false
serialize-messages = false
}
remote.artery.enabled = off
}
""")))
final case class TestSettings(
testName: String,
messageRate: Int, // msg/s
payloadSize: Int,
repeat: Int)
}
class AeronStreamLatencySpecMultiJvmNode1 extends AeronStreamLatencySpec
class AeronStreamLatencySpecMultiJvmNode2 extends AeronStreamLatencySpec
abstract class AeronStreamLatencySpec
extends MultiNodeSpec(AeronStreamLatencySpec)
with STMultiNodeSpec with ImplicitSender {
import AeronStreamLatencySpec._
val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamLatencySpec.totalMessagesFactor")
val repeatCount = system.settings.config.getInt("akka.test.AeronStreamLatencySpec.repeatCount")
var plots = LatencyPlots()
val driver = MediaDriver.launchEmbedded()
val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val stats =
new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE)))
val aeron = {
val ctx = new Aeron.Context
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
val taskRunner = {
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem])
r.start()
r
}
lazy implicit val mat = ActorMaterializer()(system)
import system.dispatcher
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val a = node(roleName).address
s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
}
val streamId = 1
val giveUpSendAfter = 30.seconds
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): TestRateReporter = {
val r = new TestRateReporter(name)
reporterExecutor.execute(r)
r
}
override def afterAll(): Unit = {
reporterExecutor.shutdown()
taskRunner.stop()
aeron.close()
driver.close()
IoUtil.delete(new File(driver.aeronDirectoryName), true)
runOn(first) {
println(plots.plot50.csv(system.name + "50"))
println(plots.plot90.csv(system.name + "90"))
println(plots.plot99.csv(system.name + "99"))
}
super.afterAll()
}
def printTotal(testName: String, payloadSize: Long, histogram: Histogram, lastRepeat: Boolean): Unit = {
import scala.collection.JavaConverters._
val percentiles = histogram.percentiles(5)
def percentile(p: Double): Double =
percentiles.iterator().asScala.collectFirst {
case value if (p - 0.5) < value.getPercentileLevelIteratedTo &&
value.getPercentileLevelIteratedTo < (p + 0.5) value.getValueIteratedTo / 1000.0
}.getOrElse(Double.NaN)
println(s"=== AeronStreamLatency $testName: RTT " +
f"50%%ile: ${percentile(50.0)}%.0f µs, " +
f"90%%ile: ${percentile(90.0)}%.0f µs, " +
f"99%%ile: ${percentile(99.0)}%.0f µs, ")
println("Histogram of RTT latencies in microseconds.")
histogram.outputPercentileDistribution(System.out, 1000.0)
// only use the last repeat for the plots
if (lastRepeat) {
plots = plots.copy(
plot50 = plots.plot50.add(testName, percentile(50.0)),
plot90 = plots.plot90.add(testName, percentile(90.0)),
plot99 = plots.plot99.add(testName, percentile(99.0)))
}
}
def printStats(side: String): Unit = {
println(side + " stats:")
stats.print(System.out)
}
val scenarios = List(
TestSettings(
testName = "rate-100-size-100",
messageRate = 100,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-1000-size-100",
messageRate = 1000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-10000-size-100",
messageRate = 10000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-1000-size-1k",
messageRate = 1000,
payloadSize = 1000,
repeat = repeatCount))
def test(testSettings: TestSettings): Unit = {
import testSettings._
runOn(first) {
val payload = ("1" * payloadSize).getBytes("utf-8")
// by default run for 2 seconds, but can be adjusted with the totalMessagesFactor
val totalMessages = (2 * messageRate * totalMessagesFactor).toInt
val sendTimes = new AtomicLongArray(totalMessages)
val histogram = new Histogram(SECONDS.toNanos(10), 3)
val rep = reporter(testName)
val barrier = new CyclicBarrier(2)
val count = new AtomicInteger
val lastRepeat = new AtomicBoolean(false)
val killSwitch = KillSwitches.shared(testName)
val started = TestProbe()
val startMsg = "0".getBytes("utf-8")
Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink))
.via(killSwitch.flow)
.runForeach { envelope
val bytes = ByteString.fromByteBuffer(envelope.byteBuffer)
if (bytes.length == 1 && bytes(0) == startMsg(0))
started.ref ! Done
else {
if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message")
rep.onMessage(1, payloadSize)
val c = count.incrementAndGet()
val d = System.nanoTime() - sendTimes.get(c - 1)
histogram.recordValue(d)
if (c == totalMessages) {
printTotal(testName, bytes.length, histogram, lastRepeat.get)
barrier.await() // this is always the last party
}
}
pool.release(envelope)
}
within(10.seconds) {
Source(1 to 50).map { _
val envelope = pool.acquire()
envelope.byteBuffer.put(startMsg)
envelope.byteBuffer.flip()
envelope
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
started.expectMsg(Done)
}
for (n 1 to repeat) {
histogram.reset()
count.set(0)
lastRepeat.set(n == repeat)
Source(1 to totalMessages)
.throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping)
.map { n
val envelope = pool.acquire()
envelope.byteBuffer.put(payload)
envelope.byteBuffer.flip()
sendTimes.set(n - 1, System.nanoTime())
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
barrier.await((totalMessages / messageRate) + 10, SECONDS)
}
killSwitch.shutdown()
rep.halt()
}
printStats(myself.name)
enterBarrier("after-" + testName)
}
"Latency of Aeron Streams" must {
"start echo" in {
runOn(second) {
// just echo back
Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink))
.runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
}
enterBarrier("echo-started")
}
for (s scenarios) {
s"be low for ${s.testName}, at ${s.messageRate} msg/s, payloadSize = ${s.payloadSize}" in test(s)
}
}
}