Merge pull request #20790 from akka/wip-latency-tests-patriknw

remove burstiness in latency tests
This commit is contained in:
Patrik Nordwall 2016-07-06 21:55:49 +02:00 committed by GitHub
commit 4d749cc563
11 changed files with 281 additions and 435 deletions

View file

@ -3,35 +3,38 @@
*/
package akka.remote.artery
import java.io.File
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicLongArray
import java.util.concurrent.locks.LockSupport
import scala.concurrent.duration._
import akka.Done
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.ActorMaterializer
import akka.stream.KillSwitches
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
import akka.testkit._
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import io.aeron.Aeron
import io.aeron.CncFileDescriptor
import io.aeron.driver.MediaDriver
import org.HdrHistogram.Histogram
import java.util.concurrent.atomic.AtomicBoolean
import akka.stream.KillSwitches
import akka.Done
import org.agrona.IoUtil
import java.io.File
import java.io.File
import akka.util.ByteString
import io.aeron.CncFileDescriptor
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
object AeronStreamLatencySpec extends MultiNodeConfig {
val first = role("first")
@ -135,7 +138,7 @@ abstract class AeronStreamLatencySpec
super.afterAll()
}
def printTotal(testName: String, payloadSize: Long, histogram: Histogram, lastRepeat: Boolean): Unit = {
def printTotal(testName: String, payloadSize: Long, histogram: Histogram, totalDurationNanos: Long, lastRepeat: Boolean): Unit = {
import scala.collection.JavaConverters._
val percentiles = histogram.percentiles(5)
def percentile(p: Double): Double =
@ -144,10 +147,13 @@ abstract class AeronStreamLatencySpec
value.getPercentileLevelIteratedTo < (p + 0.5) value.getValueIteratedTo / 1000.0
}.getOrElse(Double.NaN)
val throughput = 1000.0 * histogram.getTotalCount / totalDurationNanos.nanos.toMillis
println(s"=== AeronStreamLatency $testName: RTT " +
f"50%%ile: ${percentile(50.0)}%.0f µs, " +
f"90%%ile: ${percentile(90.0)}%.0f µs, " +
f"99%%ile: ${percentile(99.0)}%.0f µs, ")
f"99%%ile: ${percentile(99.0)}%.0f µs, " +
f"rate: ${throughput}%,.0f msg/s")
println("Histogram of RTT latencies in microseconds.")
histogram.outputPercentileDistribution(System.out, 1000.0)
@ -181,6 +187,11 @@ abstract class AeronStreamLatencySpec
messageRate = 10000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-20000-size-100",
messageRate = 20000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-1000-size-1k",
messageRate = 1000,
@ -200,6 +211,7 @@ abstract class AeronStreamLatencySpec
val rep = reporter(testName)
val barrier = new CyclicBarrier(2)
val count = new AtomicInteger
val startTime = new AtomicLong
val lastRepeat = new AtomicBoolean(false)
val killSwitch = KillSwitches.shared(testName)
val started = TestProbe()
@ -217,7 +229,8 @@ abstract class AeronStreamLatencySpec
val d = System.nanoTime() - sendTimes.get(c - 1)
histogram.recordValue(d)
if (c == totalMessages) {
printTotal(testName, bytes.length, histogram, lastRepeat.get)
val totalDurationNanos = System.nanoTime() - startTime.get
printTotal(testName, bytes.length, histogram, totalDurationNanos, lastRepeat.get)
barrier.await() // this is always the last party
}
}
@ -236,21 +249,53 @@ abstract class AeronStreamLatencySpec
started.expectMsg(Done)
}
for (n 1 to repeat) {
for (rep 1 to repeat) {
histogram.reset()
count.set(0)
lastRepeat.set(n == repeat)
lastRepeat.set(rep == repeat)
Source(1 to totalMessages)
.throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping)
.map { n
val sendFlow = Flow[Unit]
.map { _
val envelope = pool.acquire()
envelope.byteBuffer.put(payload)
envelope.byteBuffer.flip()
sendTimes.set(n - 1, System.nanoTime())
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
val queueValue = Source.fromGraph(new SendQueue[Unit])
.via(sendFlow)
.to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
.run()
val queue = new ManyToOneConcurrentArrayQueue[Unit](1024)
queueValue.inject(queue)
Thread.sleep(3000) // let materialization complete
startTime.set(System.nanoTime())
var i = 0
var adjust = 0L
// increase the rate somewhat to compensate for overhead, based on heuristics
val adjustRateFactor =
if (messageRate <= 100) 1.05
else if (messageRate <= 1000) 1.1
else if (messageRate <= 10000) 1.2
else if (messageRate <= 20000) 1.3
else 1.4
val targetDelay = (SECONDS.toNanos(1) / (messageRate * adjustRateFactor)).toLong
while (i < totalMessages) {
LockSupport.parkNanos(targetDelay - adjust)
val now = System.nanoTime()
sendTimes.set(i, now)
if (i >= 1) {
val diff = now - sendTimes.get(i - 1)
adjust = math.max(0L, (diff - targetDelay) / 2)
}
if (!queueValue.offer(()))
fail("sendQueue full")
i += 1
}
barrier.await((totalMessages / messageRate) + 10, SECONDS)
}

View file

@ -3,10 +3,9 @@
*/
package akka.remote.artery
import java.net.InetAddress
import java.util.concurrent.Executors
import scala.collection.AbstractIterator
import scala.concurrent.Await
import java.util.concurrent.atomic.AtomicLongArray
import java.util.concurrent.locks.LockSupport
import scala.concurrent.duration._
import akka.actor._
import akka.remote.testconductor.RoleName
@ -14,21 +13,11 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.testkit._
import com.typesafe.config.ConfigFactory
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLongArray
import org.HdrHistogram.Histogram
import akka.stream.scaladsl.Source
import akka.stream.ThrottleMode
import java.io.StringWriter
import java.io.PrintStream
import java.io.OutputStreamWriter
import java.io.BufferedOutputStream
import java.io.ByteArrayOutputStream
object LatencySpec extends MultiNodeConfig {
val first = role("first")
@ -38,7 +27,7 @@ object LatencySpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
# for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3)
# for serious measurements you should increase the totalMessagesFactor (30) and repeatCount (3)
akka.test.LatencySpec.totalMessagesFactor = 1.0
akka.test.LatencySpec.repeatCount = 1
akka {
@ -53,7 +42,7 @@ object LatencySpec extends MultiNodeConfig {
}
remote.artery {
enabled = on
advanced.idle-cpu-level=8
advanced.idle-cpu-level=7
advanced.compression {
enabled = on
@ -92,24 +81,27 @@ object LatencySpec extends MultiNodeConfig {
import settings._
var count = 0
var startTime = System.nanoTime()
val taskRunnerMetrics = new TaskRunnerMetrics(context.system)
def receive = {
case bytes: Array[Byte]
// length 0 is used for warmup
if (bytes.length != 0) {
if (count == 0)
startTime = System.nanoTime()
if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message")
reporter.onMessage(1, payloadSize)
count += 1
val d = System.nanoTime() - sendTimes.get(count - 1)
histogram.recordValue(d)
if (count == totalMessages) {
printTotal(testName, bytes.length, histogram)
printTotal(testName, bytes.length, histogram, System.nanoTime() - startTime)
context.stop(self)
}
}
}
def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = {
def printTotal(testName: String, payloadSize: Long, histogram: Histogram, totalDurationNanos: Long): Unit = {
import scala.collection.JavaConverters._
val percentiles = histogram.percentiles(5)
def percentile(p: Double): Double =
@ -118,13 +110,18 @@ object LatencySpec extends MultiNodeConfig {
value.getPercentileLevelIteratedTo < (p + 0.5) value.getValueIteratedTo / 1000.0
}.getOrElse(Double.NaN)
val throughput = 1000.0 * histogram.getTotalCount / math.min(1, totalDurationNanos.nanos.toMillis)
println(s"=== Latency $testName: RTT " +
f"50%%ile: ${percentile(50.0)}%.0f µs, " +
f"90%%ile: ${percentile(90.0)}%.0f µs, " +
f"99%%ile: ${percentile(99.0)}%.0f µs, ")
f"99%%ile: ${percentile(99.0)}%.0f µs, " +
f"rate: ${throughput}%,.0f msg/s")
println("Histogram of RTT latencies in microseconds.")
histogram.outputPercentileDistribution(System.out, 1000.0)
taskRunnerMetrics.printHistograms()
val plots = LatencyPlots(
PlotResult().add(testName, percentile(50.0)),
PlotResult().add(testName, percentile(90.0)),
@ -155,23 +152,11 @@ abstract class LatencySpec
var plots = LatencyPlots()
val aeron = {
val ctx = new Aeron.Context
val driver = MediaDriver.launchEmbedded()
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
lazy implicit val mat = ActorMaterializer()(system)
import system.dispatcher
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val a = node(roleName).address
s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
}
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): TestRateReporter = {
val r = new TestRateReporter(name)
@ -195,6 +180,11 @@ abstract class LatencySpec
}
val scenarios = List(
TestSettings(
testName = "warmup",
messageRate = 10000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-100-size-100",
messageRate = 100,
@ -210,6 +200,11 @@ abstract class LatencySpec
messageRate = 10000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-20000-size-100",
messageRate = 20000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-1000-size-1k",
messageRate = 1000,
@ -244,16 +239,33 @@ abstract class LatencySpec
}
warmup.foreach { _
Source(1 to totalMessages)
.throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping)
.runForeach { n
sendTimes.set(n - 1, System.nanoTime())
echo.tell(payload, receiver)
var i = 0
var adjust = 0L
// increase the rate somewhat to compensate for overhead, based on heuristics
val adjustRateFactor =
if (messageRate <= 100) 1.05
else if (messageRate <= 1000) 1.1
else if (messageRate <= 10000) 1.2
else if (messageRate <= 20000) 1.3
else 1.4
val targetDelay = (SECONDS.toNanos(1) / (messageRate * adjustRateFactor)).toLong
while (i < totalMessages) {
LockSupport.parkNanos(targetDelay - adjust)
val now = System.nanoTime()
sendTimes.set(i, now)
if (i >= 1) {
val diff = now - sendTimes.get(i - 1)
adjust = math.max(0L, (diff - targetDelay) / 2)
}
echo.tell(payload, receiver)
i += 1
}
}
watch(receiver)
expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds)
expectTerminated(receiver, ((totalMessages / messageRate) + 20).seconds)
val p = plotProbe.expectMsgType[LatencyPlots]
// only use the last repeat for the plots
if (n == repeat) {

View file

@ -75,11 +75,12 @@ object MaxThroughputSpec extends MultiNodeConfig {
final case class EndResult(totalReceived: Long)
final case class FlowControl(burstStartTime: Long) extends Echo
def receiverProps(reporter: RateReporter, payloadSize: Int): Props =
Props(new Receiver(reporter, payloadSize)).withDispatcher("akka.remote.default-remote-dispatcher")
def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean): Props =
Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics)).withDispatcher("akka.remote.default-remote-dispatcher")
class Receiver(reporter: RateReporter, payloadSize: Int) extends Actor {
class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean) extends Actor {
private var c = 0L
private val taskRunnerMetrics = new TaskRunnerMetrics(context.system)
def receive = {
case msg: Array[Byte]
@ -90,6 +91,8 @@ object MaxThroughputSpec extends MultiNodeConfig {
c = 0
sender() ! Start
case End
if (printTaskRunnerMetrics)
taskRunnerMetrics.printHistograms()
sender() ! EndResult(c)
context.stop(self)
case m: Echo
@ -98,15 +101,18 @@ object MaxThroughputSpec extends MultiNodeConfig {
}
}
def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef): Props =
Props(new Sender(target, testSettings, plotRef))
def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef,
printTaskRunnerMetrics: Boolean): Props =
Props(new Sender(target, testSettings, plotRef, printTaskRunnerMetrics))
class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef) extends Actor {
class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean)
extends Actor {
import testSettings._
val payload = ("0" * testSettings.payloadSize).getBytes("utf-8")
var startTime = 0L
var remaining = totalMessages
var maxRoundTripMillis = 0L
val taskRunnerMetrics = new TaskRunnerMetrics(context.system)
context.system.eventStream.subscribe(self, classOf[ReceivedActorRefCompressionTable])
@ -176,6 +182,10 @@ object MaxThroughputSpec extends MultiNodeConfig {
s"payload size $payloadSize, " +
s"total size ${totalSize(context.system)}, " +
s"$took ms to deliver $totalReceived messages")
if (printTaskRunnerMetrics)
taskRunnerMetrics.printHistograms()
plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024)
context.stop(self)
@ -232,8 +242,18 @@ object MaxThroughputSpec extends MultiNodeConfig {
case FlowControlManifest FlowControl(buf.getLong)
}
override def toBinary(o: AnyRef): Array[Byte] = ???
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = ???
override def toBinary(o: AnyRef): Array[Byte] = o match {
case FlowControl(burstStartTime)
val buf = ByteBuffer.allocate(8)
toBinary(o, buf)
buf.flip()
val bytes = Array.ofDim[Byte](buf.remaining)
buf.get(bytes)
bytes
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
fromBinary(ByteBuffer.wrap(bytes), manifest)
}
}
@ -319,7 +339,9 @@ abstract class MaxThroughputSpec
runOn(second) {
val rep = reporter(testName)
for (n 1 to senderReceiverPairs) {
val receiver = system.actorOf(receiverProps(rep, payloadSize), receiverName + n)
val receiver = system.actorOf(
receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1),
receiverName + n)
}
enterBarrier(receiverName + "-started")
enterBarrier(testName + "-done")
@ -333,7 +355,7 @@ abstract class MaxThroughputSpec
val receiver = identifyReceiver(receiverName + n)
val plotProbe = TestProbe()
val snd = system.actorOf(
senderProps(receiver, testSettings, plotProbe.ref),
senderProps(receiver, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1),
testName + "-snd" + n)
val terminationProbe = TestProbe()
terminationProbe.watch(snd)

View file

@ -0,0 +1,55 @@
package akka.remote.artery
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.remote.RemoteActorRefProvider
import org.HdrHistogram.Histogram
import java.util.concurrent.TimeUnit.SECONDS
class TaskRunnerMetrics(system: ActorSystem) {
private var entryOffset = 0
def printHistograms(): Unit = {
val aeronSourceHistogram = new Histogram(SECONDS.toNanos(10), 3)
val aeronSinkHistogram = new Histogram(SECONDS.toNanos(10), 3)
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport match {
case a: ArteryTransport
a.afrFileChannel.foreach { afrFileChannel
var c = 0
var aeronSourceMaxBeforeDelegate = 0L
var aeronSinkMaxBeforeDelegate = 0L
val reader = new FlightRecorderReader(afrFileChannel)
reader.structure.hiFreqLog.logs.foreach(_.compactEntries.foreach { entry
c += 1
if (c > entryOffset) {
entry.code match {
case FlightRecorderEvents.AeronSource_ReturnFromTaskRunner
aeronSourceHistogram.recordValue(entry.param)
case FlightRecorderEvents.AeronSink_ReturnFromTaskRunner
aeronSinkHistogram.recordValue(entry.param)
case FlightRecorderEvents.AeronSource_DelegateToTaskRunner
aeronSourceMaxBeforeDelegate = math.max(aeronSourceMaxBeforeDelegate, entry.param)
case FlightRecorderEvents.AeronSink_DelegateToTaskRunner
aeronSinkMaxBeforeDelegate = math.max(aeronSinkMaxBeforeDelegate, entry.param)
case _
}
}
})
entryOffset = c
if (aeronSourceHistogram.getTotalCount > 0) {
println(s"Histogram of AeronSource tasks in microseconds. Max count before delegate: $aeronSourceMaxBeforeDelegate")
aeronSourceHistogram.outputPercentileDistribution(System.out, 1000.0)
}
if (aeronSinkHistogram.getTotalCount > 0) {
println(s"Histogram of AeronSink tasks in microseconds. Max count before delegate: $aeronSinkMaxBeforeDelegate")
aeronSinkHistogram.outputPercentileDistribution(System.out, 1000.0)
}
}
case _
}
}
}

View file

@ -5,7 +5,6 @@ package akka.remote.artery
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
@ -14,7 +13,6 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NoStackTrace
import akka.Done
import akka.stream.Attributes
import akka.stream.Inlet
@ -26,6 +24,7 @@ import akka.stream.stage.InHandler
import io.aeron.Aeron
import io.aeron.Publication
import org.agrona.concurrent.UnsafeBuffer
import org.agrona.hints.ThreadHints
object AeronSink {
@ -95,15 +94,17 @@ class AeronSink(
private var completedValue: Try[Done] = Success(Done)
// FIXME measure and adjust with IdleCpuLevel
private val spinning = 1000
// spin between 2 to 20 depending on idleCpuLevel
private val spinning = 2 * taskRunner.idleCpuLevel
private var backoffCount = spinning
private var lastMsgSize = 0
private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ onOfferSuccess()),
private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ taskOnOfferSuccess()),
giveUpSendAfter, getAsyncCallback(_ onGiveUp()))
private val addOfferTask: Add = Add(offerTask)
private var offerTaskInProgress = false
private var delegateTaskStartTime = 0L
private var countBeforeDelegate = 0L
private val channelMetadata = channel.getBytes("US-ASCII")
@ -135,10 +136,10 @@ class AeronSink(
@tailrec private def publish(): Unit = {
val result = pub.offer(envelopeInFlight.aeronBuffer, 0, lastMsgSize)
// FIXME handle Publication.CLOSED
// TODO the backoff strategy should be measured and tuned
if (result < 0) {
backoffCount -= 1
if (backoffCount > 0) {
ThreadHints.onSpinWait()
publish() // recursive
} else {
// delegate backoff to shared TaskRunner
@ -146,14 +147,22 @@ class AeronSink(
// visibility of these assignments are ensured by adding the task to the command queue
offerTask.buffer = envelopeInFlight.aeronBuffer
offerTask.msgSize = lastMsgSize
delegateTaskStartTime = System.nanoTime()
taskRunner.command(addOfferTask)
flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, lastMsgSize)
flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, countBeforeDelegate)
}
} else {
countBeforeDelegate += 1
onOfferSuccess()
}
}
private def taskOnOfferSuccess(): Unit = {
countBeforeDelegate = 0
flightRecorder.hiFreq(AeronSink_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime)
onOfferSuccess()
}
private def onOfferSuccess(): Unit = {
flightRecorder.hiFreq(AeronSink_EnvelopeOffered, lastMsgSize)
offerTaskInProgress = false

View file

@ -4,10 +4,8 @@
package akka.remote.artery
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.concurrent.duration._
import akka.stream.Attributes
import akka.stream.Outlet
import akka.stream.SourceShape
@ -22,6 +20,7 @@ import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import org.agrona.DirectBuffer
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.hints.ThreadHints
object AeronSource {
@ -80,18 +79,15 @@ class AeronSource(
new GraphStageLogic(shape) with OutHandler {
private val sub = aeron.addSubscription(channel, streamId)
// FIXME measure and adjust with IdleCpuLevel
private val spinning = 1000
private val yielding = 0
private val parking = 0
private val idleStrategy = new BackoffIdleStrategy(
spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100))
private val idleStrategyRetries = spinning + yielding + parking
private var backoffCount = idleStrategyRetries
// spin between 100 to 10000 depending on idleCpuLevel
private val spinning = 1100 * taskRunner.idleCpuLevel - 1000
private var backoffCount = spinning
private var delegateTaskStartTime = 0L
private var countBeforeDelegate = 0L
// the fragmentHandler is called from `poll` in same thread, i.e. no async callback is needed
private val messageHandler = new MessageHandler(pool)
private val addPollTask: Add = Add(pollTask(sub, messageHandler, getAsyncCallback(onMessage)))
private val addPollTask: Add = Add(pollTask(sub, messageHandler, getAsyncCallback(taskOnMessage)))
private val channelMetadata = channel.getBytes("US-ASCII")
@ -107,8 +103,7 @@ class AeronSource(
// OutHandler
override def onPull(): Unit = {
idleStrategy.reset()
backoffCount = idleStrategyRetries
backoffCount = spinning
subscriberLoop()
}
@ -118,24 +113,31 @@ class AeronSource(
val msg = messageHandler.messageReceived
messageHandler.reset() // for GC
if (fragmentsRead > 0) {
countBeforeDelegate += 1
if (msg ne null)
onMessage(msg)
else
subscriberLoop() // recursive, read more fragments
} else {
// TODO the backoff strategy should be measured and tuned
backoffCount -= 1
if (backoffCount > 0) {
idleStrategy.idle()
ThreadHints.onSpinWait()
subscriberLoop() // recursive
} else {
// delegate backoff to shared TaskRunner
flightRecorder.hiFreq(AeronSource_DelegateToTaskRunner, 0)
flightRecorder.hiFreq(AeronSource_DelegateToTaskRunner, countBeforeDelegate)
delegateTaskStartTime = System.nanoTime()
taskRunner.command(addPollTask)
}
}
}
private def taskOnMessage(data: EnvelopeBuffer): Unit = {
countBeforeDelegate = 0
flightRecorder.hiFreq(AeronSource_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime)
onMessage(data)
}
private def onMessage(data: EnvelopeBuffer): Unit = {
flightRecorder.hiFreq(AeronSource_Received, data.byteBuffer.limit)
push(out, data)

View file

@ -6,6 +6,7 @@ package akka.remote.artery
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MICROSECONDS
import akka.remote.artery.compress.CompressionProtocol.CompressionMessage
import scala.concurrent.Future
@ -415,25 +416,30 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
if (remoteSettings.AeronDirectoryName.nonEmpty)
driverContext.aeronDirectoryName(remoteSettings.AeronDirectoryName)
// FIXME settings from config
driverContext.conductorIdleStrategy()
driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(20))
driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(20))
driverContext.driverTimeoutMs(SECONDS.toNanos(20))
if (remoteSettings.IdleCpuLevel == 10) {
val idleCpuLevel = remoteSettings.IdleCpuLevel
if (idleCpuLevel == 10) {
driverContext
.threadingMode(ThreadingMode.DEDICATED)
.conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1))
.receiverIdleStrategy(new BusySpinIdleStrategy)
.senderIdleStrategy(new BusySpinIdleStrategy)
} else if (remoteSettings.IdleCpuLevel == 1) {
.receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
.senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else if (idleCpuLevel == 1) {
driverContext
.threadingMode(ThreadingMode.SHARED)
//FIXME measure: .sharedIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 200))
} else if (remoteSettings.IdleCpuLevel <= 5) {
.sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else if (idleCpuLevel <= 7) {
driverContext
.threadingMode(ThreadingMode.SHARED_NETWORK)
//FIXME measure: .sharedNetworkIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 20 * (11 - remoteSettings.IdleCpuLevel)))
.sharedNetworkIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else {
driverContext
.threadingMode(ThreadingMode.DEDICATED)
.receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
.senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
}
val driver = MediaDriver.launchEmbedded(driverContext)

View file

@ -28,12 +28,14 @@ object FlightRecorderEvents {
val AeronSink_EnvelopeOffered = 18
val AeronSink_GaveUpEnvelope = 19
val AeronSink_DelegateToTaskRunner = 20
val AeronSink_ReturnFromTaskRunner = 21
// Aeron Source events
val AeronSource_Started = 21
val AeronSource_Stopped = 22
val AeronSource_Received = 23
val AeronSource_DelegateToTaskRunner = 24
val AeronSource_Started = 22
val AeronSource_Stopped = 23
val AeronSource_Received = 24
val AeronSource_DelegateToTaskRunner = 25
val AeronSource_ReturnFromTaskRunner = 26
// Compression events
val Compression_CompressedActorRef = 25

View file

@ -3,7 +3,7 @@
*/
package akka.remote.artery
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MICROSECONDS
import scala.util.control.NonFatal
import akka.actor.ExtendedActorSystem
import akka.dispatch.AbstractNodeQueue
@ -80,12 +80,29 @@ private[akka] object TaskRunner {
override def toString(): String =
elements.filterNot(_ eq null).mkString("[", ",", "]")
}
def createIdleStrategy(idleCpuLevel: Int): IdleStrategy = {
if (idleCpuLevel == 1) {
val maxParkMicros = 400
new BackoffIdleStrategy(100, 1, MICROSECONDS.toNanos(1), MICROSECONDS.toNanos(maxParkMicros))
} else if (idleCpuLevel == 10)
new BusySpinIdleStrategy
else {
// spin between 100 to 10000 depending on idleCpuLevel
val spinning = 1100 * idleCpuLevel - 1000
val yielding = 5 * idleCpuLevel
val minParkNanos = 1
// park between 250 and 10 micros depending on idleCpuLevel
val maxParkNanos = MICROSECONDS.toNanos(280 - 30 * idleCpuLevel)
new BackoffIdleStrategy(spinning, yielding, 1, maxParkNanos)
}
}
}
/**
* INTERNAL API
*/
private[akka] class TaskRunner(system: ExtendedActorSystem, idleCpuLevel: Int) extends Runnable {
private[akka] class TaskRunner(system: ExtendedActorSystem, val idleCpuLevel: Int) extends Runnable {
import TaskRunner._
private val log = Logging(system, getClass)
@ -93,21 +110,7 @@ private[akka] class TaskRunner(system: ExtendedActorSystem, idleCpuLevel: Int) e
private[this] val cmdQueue = new CommandQueue
private[this] val tasks = new ArrayBag[Task]
// TODO the backoff strategy should be measured and tuned
private val idleStrategy: IdleStrategy = {
if (idleCpuLevel == 1) {
val maxParkMicros = 400
new BackoffIdleStrategy(1, 1, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(maxParkMicros))
} else if (idleCpuLevel == 10)
new BusySpinIdleStrategy
else {
val spinning = 100000 * idleCpuLevel
val yielding = 2 * idleCpuLevel
val maxParkMicros = 40 * (11 - idleCpuLevel)
new BackoffIdleStrategy(
spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(maxParkMicros))
}
}
private val idleStrategy = createIdleStrategy(idleCpuLevel)
private var reset = false
def start(): Unit = {
@ -132,8 +135,8 @@ private[akka] class TaskRunner(system: ExtendedActorSystem, idleCpuLevel: Int) e
try {
running = true
while (running) {
executeTasks()
processCommand(cmdQueue.poll())
executeTasks()
if (reset) {
reset = false
idleStrategy.reset()

View file

@ -11,6 +11,11 @@ agrona.disable.bounds.checks=true
aeron.threading.mode=SHARED_NETWORK
# low latency settings
#aeron.threading.mode=DEDICATED
#aeron.sender.idle.strategy=org.agrona.concurrent.BusySpinIdleStrategy
#aeron.receiver.idle.strategy=org.agrona.concurrent.BusySpinIdleStrategy
# use same director in akka.remote.artery.advanced.aeron-dir config
# of the Akka application
aeron.dir=target/aeron

View file

@ -1,315 +0,0 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import java.util.concurrent.Executors
import scala.util.Success
import scala.util.Failure
import scala.concurrent.Future
import akka.Done
import org.HdrHistogram.Histogram
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.AtomicLongArray
import akka.stream.ThrottleMode
import org.agrona.ErrorHandler
import io.aeron.AvailableImageHandler
import io.aeron.UnavailableImageHandler
import io.aeron.Image
import io.aeron.AvailableImageHandler
import akka.actor.ExtendedActorSystem
import java.io.File
import io.aeron.CncFileDescriptor
object AeronStreamsApp {
val channel1 = "aeron:udp?endpoint=localhost:40123"
val channel2 = "aeron:udp?endpoint=localhost:40124"
val streamId = 1
val throughputN = 10000000
val latencyRate = 10000 // per second
val latencyN = 10 * latencyRate
val payload = ("0" * 100).getBytes("utf-8")
val giveUpSendAfter = 60.seconds
lazy val sendTimes = new AtomicLongArray(latencyN)
lazy val driver = {
val driverContext = new MediaDriver.Context
driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.driverTimeoutMs(SECONDS.toNanos(10))
MediaDriver.launchEmbedded(driverContext)
}
lazy val stats = {
new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE)))
}
lazy val aeron = {
val ctx = new Aeron.Context
ctx.errorHandler(new ErrorHandler {
override def onError(cause: Throwable) {
println(s"# Aeron onError " + cause) // FIXME
}
})
ctx.availableImageHandler(new AvailableImageHandler {
override def onAvailableImage(img: Image): Unit = {
println(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
ctx.unavailableImageHandler(new UnavailableImageHandler {
override def onUnavailableImage(img: Image): Unit = {
println(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
lazy val system = ActorSystem("AeronStreams")
lazy implicit val mat = ActorMaterializer()(system)
val idleCpuLevel = 5
lazy val taskRunner = {
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel)
r.start()
r
}
lazy val reporter = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter {
override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = {
println("%.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format(
messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024)))
}
})
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
lazy val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumFrameSize)
def stopReporter(): Unit = {
reporter.halt()
reporterExecutor.shutdown()
}
def exit(status: Int): Unit = {
stopReporter()
system.scheduler.scheduleOnce(10.seconds) {
mat.shutdown()
system.terminate()
new Thread {
Thread.sleep(3000)
System.exit(status)
}.run()
}(system.dispatcher)
}
lazy val histogram = new Histogram(SECONDS.toNanos(10), 3)
def printTotal(total: Int, pre: String, startTime: Long, payloadSize: Long): Unit = {
val d = (System.nanoTime - startTime).nanos.toMillis
println(f"### $total $pre of size ${payloadSize} bytes took $d ms, " +
f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s")
if (histogram.getTotalCount > 0) {
println("Histogram of RTT latencies in microseconds.")
histogram.outputPercentileDistribution(System.out, 1000.0)
}
}
def main(args: Array[String]): Unit = {
// receiver of plain throughput testing
if (args.length == 0 || args(0) == "receiver")
runReceiver()
// sender of plain throughput testing
if (args.length == 0 || args(0) == "sender")
runSender()
// sender of ping-pong latency testing
if (args.length != 0 && args(0) == "echo-sender")
runEchoSender()
// echo receiver of ping-pong latency testing
if (args.length != 0 && args(0) == "echo-receiver")
runEchoReceiver()
if (args.length != 0 && args(0) == "debug-receiver")
runDebugReceiver()
if (args.length != 0 && args(0) == "debug-sender")
runDebugSender()
if (args.length >= 2 && args(1) == "stats")
runStats()
}
def runReceiver(): Unit = {
import system.dispatcher
reporterExecutor.execute(reporter)
val r = reporter
var t0 = System.nanoTime()
var count = 0L
var payloadSize = 0L
Source.fromGraph(new AeronSource(channel1, streamId, aeron, taskRunner, pool, IgnoreEventSink))
.map { envelope
r.onMessage(1, envelope.byteBuffer.limit)
envelope
}
.runForeach { envelope
count += 1
if (count == 1) {
t0 = System.nanoTime()
payloadSize = envelope.byteBuffer.limit
} else if (count == throughputN) {
exit(0)
printTotal(throughputN, "receive", t0, payloadSize)
}
pool.release(envelope)
}.onFailure {
case e
e.printStackTrace
exit(-1)
}
}
def runSender(): Unit = {
reporterExecutor.execute(reporter)
val r = reporter
val t0 = System.nanoTime()
Source(1 to throughputN)
.map { n
if (n == throughputN) {
exit(0)
printTotal(throughputN, "send", t0, payload.length)
}
n
}
.map { _
r.onMessage(1, payload.length)
val envelope = pool.acquire()
envelope.byteBuffer.put(payload)
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
}
def runEchoReceiver(): Unit = {
// just echo back on channel2
reporterExecutor.execute(reporter)
val r = reporter
Source.fromGraph(new AeronSource(channel1, streamId, aeron, taskRunner, pool, IgnoreEventSink))
.map { envelope
r.onMessage(1, envelope.byteBuffer.limit)
envelope
}
.runWith(new AeronSink(channel2, streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
}
def runEchoSender(): Unit = {
import system.dispatcher
reporterExecutor.execute(reporter)
val r = reporter
val barrier = new CyclicBarrier(2)
var repeat = 3
val count = new AtomicInteger
var t0 = System.nanoTime()
Source.fromGraph(new AeronSource(channel2, streamId, aeron, taskRunner, pool, IgnoreEventSink))
.map { envelope
r.onMessage(1, envelope.byteBuffer.limit)
envelope
}
.runForeach { envelope
val c = count.incrementAndGet()
val d = System.nanoTime() - sendTimes.get(c - 1)
if (c % (latencyN / 10) == 0)
println(s"# receive offset $c => ${d / 1000} µs") // FIXME
histogram.recordValue(d)
if (c == latencyN) {
printTotal(latencyN, "ping-pong", t0, envelope.byteBuffer.limit)
barrier.await() // this is always the last party
}
pool.release(envelope)
}.onFailure {
case e
e.printStackTrace
exit(-1)
}
while (repeat > 0) {
repeat -= 1
histogram.reset()
count.set(0)
t0 = System.nanoTime()
Source(1 to latencyN)
.throttle(latencyRate, 1.second, latencyRate / 10, ThrottleMode.Shaping)
.map { n
if (n % (latencyN / 10) == 0)
println(s"# send offset $n") // FIXME
sendTimes.set(n - 1, System.nanoTime())
val envelope = pool.acquire()
envelope.byteBuffer.put(payload)
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
barrier.await()
}
exit(0)
}
def runDebugReceiver(): Unit = {
import system.dispatcher
Source.fromGraph(new AeronSource(channel1, streamId, aeron, taskRunner, pool, IgnoreEventSink))
.map { envelope
val bytes = Array.ofDim[Byte](envelope.byteBuffer.limit)
envelope.byteBuffer.get(bytes)
pool.release(envelope)
new String(bytes, "utf-8")
}
.runForeach { s
println(s)
}.onFailure {
case e
e.printStackTrace
exit(-1)
}
}
def runDebugSender(): Unit = {
val fill = "0000"
Source(1 to 1000)
.throttle(1, 1.second, 1, ThrottleMode.Shaping)
.map { n
val s = (fill + n.toString).takeRight(4)
println(s)
val envelope = pool.acquire()
envelope.byteBuffer.put(s.getBytes("utf-8"))
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink))
}
def runStats(): Unit = {
Source.tick(10.second, 10.second, "tick").runForeach { _ stats.print(System.out) }
}
}