diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 9dcf1c7167..240a1c1c2f 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -3,35 +3,38 @@ */ package akka.remote.artery +import java.io.File import java.util.concurrent.CyclicBarrier import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLongArray +import java.util.concurrent.locks.LockSupport import scala.concurrent.duration._ + +import akka.Done import akka.actor._ import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.stream.ActorMaterializer +import akka.stream.KillSwitches import akka.stream.ThrottleMode +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.testkit._ +import akka.util.ByteString import com.typesafe.config.ConfigFactory import io.aeron.Aeron +import io.aeron.CncFileDescriptor import io.aeron.driver.MediaDriver import org.HdrHistogram.Histogram -import java.util.concurrent.atomic.AtomicBoolean - -import akka.stream.KillSwitches -import akka.Done import org.agrona.IoUtil -import java.io.File -import java.io.File - -import akka.util.ByteString -import io.aeron.CncFileDescriptor +import org.agrona.concurrent.ManyToOneConcurrentArrayQueue object AeronStreamLatencySpec extends MultiNodeConfig { val first = role("first") @@ -135,7 +138,7 @@ abstract class AeronStreamLatencySpec super.afterAll() } - def printTotal(testName: String, payloadSize: Long, histogram: Histogram, lastRepeat: Boolean): Unit = { + def printTotal(testName: String, payloadSize: Long, histogram: Histogram, totalDurationNanos: Long, lastRepeat: Boolean): Unit = { import scala.collection.JavaConverters._ val percentiles = histogram.percentiles(5) def percentile(p: Double): Double = @@ -144,10 +147,13 @@ abstract class AeronStreamLatencySpec value.getPercentileLevelIteratedTo < (p + 0.5) ⇒ value.getValueIteratedTo / 1000.0 }.getOrElse(Double.NaN) + val throughput = 1000.0 * histogram.getTotalCount / totalDurationNanos.nanos.toMillis + println(s"=== AeronStreamLatency $testName: RTT " + f"50%%ile: ${percentile(50.0)}%.0f µs, " + f"90%%ile: ${percentile(90.0)}%.0f µs, " + - f"99%%ile: ${percentile(99.0)}%.0f µs, ") + f"99%%ile: ${percentile(99.0)}%.0f µs, " + + f"rate: ${throughput}%,.0f msg/s") println("Histogram of RTT latencies in microseconds.") histogram.outputPercentileDistribution(System.out, 1000.0) @@ -181,6 +187,11 @@ abstract class AeronStreamLatencySpec messageRate = 10000, payloadSize = 100, repeat = repeatCount), + TestSettings( + testName = "rate-20000-size-100", + messageRate = 20000, + payloadSize = 100, + repeat = repeatCount), TestSettings( testName = "rate-1000-size-1k", messageRate = 1000, @@ -200,6 +211,7 @@ abstract class AeronStreamLatencySpec val rep = reporter(testName) val barrier = new CyclicBarrier(2) val count = new AtomicInteger + val startTime = new AtomicLong val lastRepeat = new AtomicBoolean(false) val killSwitch = KillSwitches.shared(testName) val started = TestProbe() @@ -217,7 +229,8 @@ abstract class AeronStreamLatencySpec val d = System.nanoTime() - sendTimes.get(c - 1) histogram.recordValue(d) if (c == totalMessages) { - printTotal(testName, bytes.length, histogram, lastRepeat.get) + val totalDurationNanos = System.nanoTime() - startTime.get + printTotal(testName, bytes.length, histogram, totalDurationNanos, lastRepeat.get) barrier.await() // this is always the last party } } @@ -236,21 +249,53 @@ abstract class AeronStreamLatencySpec started.expectMsg(Done) } - for (n ← 1 to repeat) { + for (rep ← 1 to repeat) { histogram.reset() count.set(0) - lastRepeat.set(n == repeat) + lastRepeat.set(rep == repeat) - Source(1 to totalMessages) - .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) - .map { n ⇒ + val sendFlow = Flow[Unit] + .map { _ ⇒ val envelope = pool.acquire() envelope.byteBuffer.put(payload) envelope.byteBuffer.flip() - sendTimes.set(n - 1, System.nanoTime()) envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + + val queueValue = Source.fromGraph(new SendQueue[Unit]) + .via(sendFlow) + .to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .run() + + val queue = new ManyToOneConcurrentArrayQueue[Unit](1024) + queueValue.inject(queue) + Thread.sleep(3000) // let materialization complete + + startTime.set(System.nanoTime()) + + var i = 0 + var adjust = 0L + // increase the rate somewhat to compensate for overhead, based on heuristics + val adjustRateFactor = + if (messageRate <= 100) 1.05 + else if (messageRate <= 1000) 1.1 + else if (messageRate <= 10000) 1.2 + else if (messageRate <= 20000) 1.3 + else 1.4 + val targetDelay = (SECONDS.toNanos(1) / (messageRate * adjustRateFactor)).toLong + while (i < totalMessages) { + LockSupport.parkNanos(targetDelay - adjust) + val now = System.nanoTime() + sendTimes.set(i, now) + if (i >= 1) { + val diff = now - sendTimes.get(i - 1) + adjust = math.max(0L, (diff - targetDelay) / 2) + } + + if (!queueValue.offer(())) + fail("sendQueue full") + i += 1 + } barrier.await((totalMessages / messageRate) + 10, SECONDS) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index 7746658e89..185221267c 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -3,10 +3,9 @@ */ package akka.remote.artery -import java.net.InetAddress import java.util.concurrent.Executors -import scala.collection.AbstractIterator -import scala.concurrent.Await +import java.util.concurrent.atomic.AtomicLongArray +import java.util.concurrent.locks.LockSupport import scala.concurrent.duration._ import akka.actor._ import akka.remote.testconductor.RoleName @@ -14,21 +13,11 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Source import akka.testkit._ import com.typesafe.config.ConfigFactory -import io.aeron.Aeron -import io.aeron.driver.MediaDriver -import java.util.concurrent.CyclicBarrier -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicLongArray import org.HdrHistogram.Histogram +import akka.stream.scaladsl.Source import akka.stream.ThrottleMode -import java.io.StringWriter -import java.io.PrintStream -import java.io.OutputStreamWriter -import java.io.BufferedOutputStream -import java.io.ByteArrayOutputStream object LatencySpec extends MultiNodeConfig { val first = role("first") @@ -38,7 +27,7 @@ object LatencySpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(s""" - # for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3) + # for serious measurements you should increase the totalMessagesFactor (30) and repeatCount (3) akka.test.LatencySpec.totalMessagesFactor = 1.0 akka.test.LatencySpec.repeatCount = 1 akka { @@ -53,7 +42,7 @@ object LatencySpec extends MultiNodeConfig { } remote.artery { enabled = on - advanced.idle-cpu-level=8 + advanced.idle-cpu-level=7 advanced.compression { enabled = on @@ -92,24 +81,27 @@ object LatencySpec extends MultiNodeConfig { import settings._ var count = 0 + var startTime = System.nanoTime() + val taskRunnerMetrics = new TaskRunnerMetrics(context.system) def receive = { case bytes: Array[Byte] ⇒ - // length 0 is used for warmup if (bytes.length != 0) { + if (count == 0) + startTime = System.nanoTime() if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") reporter.onMessage(1, payloadSize) count += 1 val d = System.nanoTime() - sendTimes.get(count - 1) histogram.recordValue(d) if (count == totalMessages) { - printTotal(testName, bytes.length, histogram) + printTotal(testName, bytes.length, histogram, System.nanoTime() - startTime) context.stop(self) } } } - def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = { + def printTotal(testName: String, payloadSize: Long, histogram: Histogram, totalDurationNanos: Long): Unit = { import scala.collection.JavaConverters._ val percentiles = histogram.percentiles(5) def percentile(p: Double): Double = @@ -118,13 +110,18 @@ object LatencySpec extends MultiNodeConfig { value.getPercentileLevelIteratedTo < (p + 0.5) ⇒ value.getValueIteratedTo / 1000.0 }.getOrElse(Double.NaN) + val throughput = 1000.0 * histogram.getTotalCount / math.min(1, totalDurationNanos.nanos.toMillis) + println(s"=== Latency $testName: RTT " + f"50%%ile: ${percentile(50.0)}%.0f µs, " + f"90%%ile: ${percentile(90.0)}%.0f µs, " + - f"99%%ile: ${percentile(99.0)}%.0f µs, ") + f"99%%ile: ${percentile(99.0)}%.0f µs, " + + f"rate: ${throughput}%,.0f msg/s") println("Histogram of RTT latencies in microseconds.") histogram.outputPercentileDistribution(System.out, 1000.0) + taskRunnerMetrics.printHistograms() + val plots = LatencyPlots( PlotResult().add(testName, percentile(50.0)), PlotResult().add(testName, percentile(90.0)), @@ -155,23 +152,11 @@ abstract class LatencySpec var plots = LatencyPlots() - val aeron = { - val ctx = new Aeron.Context - val driver = MediaDriver.launchEmbedded() - ctx.aeronDirectoryName(driver.aeronDirectoryName) - Aeron.connect(ctx) - } - lazy implicit val mat = ActorMaterializer()(system) import system.dispatcher override def initialParticipants = roles.size - def channel(roleName: RoleName) = { - val a = node(roleName).address - s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - } - lazy val reporterExecutor = Executors.newFixedThreadPool(1) def reporter(name: String): TestRateReporter = { val r = new TestRateReporter(name) @@ -195,6 +180,11 @@ abstract class LatencySpec } val scenarios = List( + TestSettings( + testName = "warmup", + messageRate = 10000, + payloadSize = 100, + repeat = repeatCount), TestSettings( testName = "rate-100-size-100", messageRate = 100, @@ -210,6 +200,11 @@ abstract class LatencySpec messageRate = 10000, payloadSize = 100, repeat = repeatCount), + TestSettings( + testName = "rate-20000-size-100", + messageRate = 20000, + payloadSize = 100, + repeat = repeatCount), TestSettings( testName = "rate-1000-size-1k", messageRate = 1000, @@ -244,16 +239,33 @@ abstract class LatencySpec } warmup.foreach { _ ⇒ - Source(1 to totalMessages) - .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) - .runForeach { n ⇒ - sendTimes.set(n - 1, System.nanoTime()) - echo.tell(payload, receiver) + + var i = 0 + var adjust = 0L + // increase the rate somewhat to compensate for overhead, based on heuristics + val adjustRateFactor = + if (messageRate <= 100) 1.05 + else if (messageRate <= 1000) 1.1 + else if (messageRate <= 10000) 1.2 + else if (messageRate <= 20000) 1.3 + else 1.4 + val targetDelay = (SECONDS.toNanos(1) / (messageRate * adjustRateFactor)).toLong + while (i < totalMessages) { + LockSupport.parkNanos(targetDelay - adjust) + val now = System.nanoTime() + sendTimes.set(i, now) + if (i >= 1) { + val diff = now - sendTimes.get(i - 1) + adjust = math.max(0L, (diff - targetDelay) / 2) } + + echo.tell(payload, receiver) + i += 1 + } } watch(receiver) - expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds) + expectTerminated(receiver, ((totalMessages / messageRate) + 20).seconds) val p = plotProbe.expectMsgType[LatencyPlots] // only use the last repeat for the plots if (n == repeat) { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 50eeb1035e..0ad6c1c2bb 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -75,11 +75,12 @@ object MaxThroughputSpec extends MultiNodeConfig { final case class EndResult(totalReceived: Long) final case class FlowControl(burstStartTime: Long) extends Echo - def receiverProps(reporter: RateReporter, payloadSize: Int): Props = - Props(new Receiver(reporter, payloadSize)).withDispatcher("akka.remote.default-remote-dispatcher") + def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean): Props = + Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics)).withDispatcher("akka.remote.default-remote-dispatcher") - class Receiver(reporter: RateReporter, payloadSize: Int) extends Actor { + class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean) extends Actor { private var c = 0L + private val taskRunnerMetrics = new TaskRunnerMetrics(context.system) def receive = { case msg: Array[Byte] ⇒ @@ -90,6 +91,8 @@ object MaxThroughputSpec extends MultiNodeConfig { c = 0 sender() ! Start case End ⇒ + if (printTaskRunnerMetrics) + taskRunnerMetrics.printHistograms() sender() ! EndResult(c) context.stop(self) case m: Echo ⇒ @@ -98,15 +101,18 @@ object MaxThroughputSpec extends MultiNodeConfig { } } - def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef): Props = - Props(new Sender(target, testSettings, plotRef)) + def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, + printTaskRunnerMetrics: Boolean): Props = + Props(new Sender(target, testSettings, plotRef, printTaskRunnerMetrics)) - class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef) extends Actor { + class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean) + extends Actor { import testSettings._ val payload = ("0" * testSettings.payloadSize).getBytes("utf-8") var startTime = 0L var remaining = totalMessages var maxRoundTripMillis = 0L + val taskRunnerMetrics = new TaskRunnerMetrics(context.system) context.system.eventStream.subscribe(self, classOf[ReceivedActorRefCompressionTable]) @@ -176,6 +182,10 @@ object MaxThroughputSpec extends MultiNodeConfig { s"payload size $payloadSize, " + s"total size ${totalSize(context.system)}, " + s"$took ms to deliver $totalReceived messages") + + if (printTaskRunnerMetrics) + taskRunnerMetrics.printHistograms() + plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024) context.stop(self) @@ -232,8 +242,18 @@ object MaxThroughputSpec extends MultiNodeConfig { case FlowControlManifest ⇒ FlowControl(buf.getLong) } - override def toBinary(o: AnyRef): Array[Byte] = ??? - override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = ??? + override def toBinary(o: AnyRef): Array[Byte] = o match { + case FlowControl(burstStartTime) ⇒ + val buf = ByteBuffer.allocate(8) + toBinary(o, buf) + buf.flip() + val bytes = Array.ofDim[Byte](buf.remaining) + buf.get(bytes) + bytes + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = + fromBinary(ByteBuffer.wrap(bytes), manifest) } } @@ -319,7 +339,9 @@ abstract class MaxThroughputSpec runOn(second) { val rep = reporter(testName) for (n ← 1 to senderReceiverPairs) { - val receiver = system.actorOf(receiverProps(rep, payloadSize), receiverName + n) + val receiver = system.actorOf( + receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1), + receiverName + n) } enterBarrier(receiverName + "-started") enterBarrier(testName + "-done") @@ -333,7 +355,7 @@ abstract class MaxThroughputSpec val receiver = identifyReceiver(receiverName + n) val plotProbe = TestProbe() val snd = system.actorOf( - senderProps(receiver, testSettings, plotProbe.ref), + senderProps(receiver, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1), testName + "-snd" + n) val terminationProbe = TestProbe() terminationProbe.watch(snd) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TaskRunnerMetrics.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TaskRunnerMetrics.scala new file mode 100644 index 0000000000..f939669d81 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TaskRunnerMetrics.scala @@ -0,0 +1,55 @@ +package akka.remote.artery + +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.remote.RemoteActorRefProvider +import org.HdrHistogram.Histogram +import java.util.concurrent.TimeUnit.SECONDS + +class TaskRunnerMetrics(system: ActorSystem) { + + private var entryOffset = 0 + + def printHistograms(): Unit = { + val aeronSourceHistogram = new Histogram(SECONDS.toNanos(10), 3) + val aeronSinkHistogram = new Histogram(SECONDS.toNanos(10), 3) + system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport match { + case a: ArteryTransport ⇒ + a.afrFileChannel.foreach { afrFileChannel ⇒ + var c = 0 + var aeronSourceMaxBeforeDelegate = 0L + var aeronSinkMaxBeforeDelegate = 0L + val reader = new FlightRecorderReader(afrFileChannel) + reader.structure.hiFreqLog.logs.foreach(_.compactEntries.foreach { entry ⇒ + c += 1 + if (c > entryOffset) { + entry.code match { + case FlightRecorderEvents.AeronSource_ReturnFromTaskRunner ⇒ + aeronSourceHistogram.recordValue(entry.param) + case FlightRecorderEvents.AeronSink_ReturnFromTaskRunner ⇒ + aeronSinkHistogram.recordValue(entry.param) + case FlightRecorderEvents.AeronSource_DelegateToTaskRunner ⇒ + aeronSourceMaxBeforeDelegate = math.max(aeronSourceMaxBeforeDelegate, entry.param) + case FlightRecorderEvents.AeronSink_DelegateToTaskRunner ⇒ + aeronSinkMaxBeforeDelegate = math.max(aeronSinkMaxBeforeDelegate, entry.param) + case _ ⇒ + } + } + }) + entryOffset = c + + if (aeronSourceHistogram.getTotalCount > 0) { + println(s"Histogram of AeronSource tasks in microseconds. Max count before delegate: $aeronSourceMaxBeforeDelegate") + aeronSourceHistogram.outputPercentileDistribution(System.out, 1000.0) + } + + if (aeronSinkHistogram.getTotalCount > 0) { + println(s"Histogram of AeronSink tasks in microseconds. Max count before delegate: $aeronSinkMaxBeforeDelegate") + aeronSinkHistogram.outputPercentileDistribution(System.out, 1000.0) + } + } + case _ ⇒ + } + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index a580e39834..f93a6ab013 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -5,7 +5,6 @@ package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger - import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise @@ -14,7 +13,6 @@ import scala.util.Failure import scala.util.Success import scala.util.Try import scala.util.control.NoStackTrace - import akka.Done import akka.stream.Attributes import akka.stream.Inlet @@ -26,6 +24,7 @@ import akka.stream.stage.InHandler import io.aeron.Aeron import io.aeron.Publication import org.agrona.concurrent.UnsafeBuffer +import org.agrona.hints.ThreadHints object AeronSink { @@ -95,15 +94,17 @@ class AeronSink( private var completedValue: Try[Done] = Success(Done) - // FIXME measure and adjust with IdleCpuLevel - private val spinning = 1000 + // spin between 2 to 20 depending on idleCpuLevel + private val spinning = 2 * taskRunner.idleCpuLevel private var backoffCount = spinning private var lastMsgSize = 0 - private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ ⇒ onOfferSuccess()), + private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ ⇒ taskOnOfferSuccess()), giveUpSendAfter, getAsyncCallback(_ ⇒ onGiveUp())) private val addOfferTask: Add = Add(offerTask) private var offerTaskInProgress = false + private var delegateTaskStartTime = 0L + private var countBeforeDelegate = 0L private val channelMetadata = channel.getBytes("US-ASCII") @@ -135,10 +136,10 @@ class AeronSink( @tailrec private def publish(): Unit = { val result = pub.offer(envelopeInFlight.aeronBuffer, 0, lastMsgSize) // FIXME handle Publication.CLOSED - // TODO the backoff strategy should be measured and tuned if (result < 0) { backoffCount -= 1 if (backoffCount > 0) { + ThreadHints.onSpinWait() publish() // recursive } else { // delegate backoff to shared TaskRunner @@ -146,14 +147,22 @@ class AeronSink( // visibility of these assignments are ensured by adding the task to the command queue offerTask.buffer = envelopeInFlight.aeronBuffer offerTask.msgSize = lastMsgSize + delegateTaskStartTime = System.nanoTime() taskRunner.command(addOfferTask) - flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, lastMsgSize) + flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, countBeforeDelegate) } } else { + countBeforeDelegate += 1 onOfferSuccess() } } + private def taskOnOfferSuccess(): Unit = { + countBeforeDelegate = 0 + flightRecorder.hiFreq(AeronSink_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime) + onOfferSuccess() + } + private def onOfferSuccess(): Unit = { flightRecorder.hiFreq(AeronSink_EnvelopeOffered, lastMsgSize) offerTaskInProgress = false diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index 629ddf5b66..a6133d0104 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -4,10 +4,8 @@ package akka.remote.artery import java.util.concurrent.TimeUnit - import scala.annotation.tailrec import scala.concurrent.duration._ - import akka.stream.Attributes import akka.stream.Outlet import akka.stream.SourceShape @@ -22,6 +20,7 @@ import io.aeron.logbuffer.FragmentHandler import io.aeron.logbuffer.Header import org.agrona.DirectBuffer import org.agrona.concurrent.BackoffIdleStrategy +import org.agrona.hints.ThreadHints object AeronSource { @@ -80,18 +79,15 @@ class AeronSource( new GraphStageLogic(shape) with OutHandler { private val sub = aeron.addSubscription(channel, streamId) - // FIXME measure and adjust with IdleCpuLevel - private val spinning = 1000 - private val yielding = 0 - private val parking = 0 - private val idleStrategy = new BackoffIdleStrategy( - spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100)) - private val idleStrategyRetries = spinning + yielding + parking - private var backoffCount = idleStrategyRetries + // spin between 100 to 10000 depending on idleCpuLevel + private val spinning = 1100 * taskRunner.idleCpuLevel - 1000 + private var backoffCount = spinning + private var delegateTaskStartTime = 0L + private var countBeforeDelegate = 0L // the fragmentHandler is called from `poll` in same thread, i.e. no async callback is needed private val messageHandler = new MessageHandler(pool) - private val addPollTask: Add = Add(pollTask(sub, messageHandler, getAsyncCallback(onMessage))) + private val addPollTask: Add = Add(pollTask(sub, messageHandler, getAsyncCallback(taskOnMessage))) private val channelMetadata = channel.getBytes("US-ASCII") @@ -107,8 +103,7 @@ class AeronSource( // OutHandler override def onPull(): Unit = { - idleStrategy.reset() - backoffCount = idleStrategyRetries + backoffCount = spinning subscriberLoop() } @@ -118,24 +113,31 @@ class AeronSource( val msg = messageHandler.messageReceived messageHandler.reset() // for GC if (fragmentsRead > 0) { + countBeforeDelegate += 1 if (msg ne null) onMessage(msg) else subscriberLoop() // recursive, read more fragments } else { - // TODO the backoff strategy should be measured and tuned backoffCount -= 1 if (backoffCount > 0) { - idleStrategy.idle() + ThreadHints.onSpinWait() subscriberLoop() // recursive } else { // delegate backoff to shared TaskRunner - flightRecorder.hiFreq(AeronSource_DelegateToTaskRunner, 0) + flightRecorder.hiFreq(AeronSource_DelegateToTaskRunner, countBeforeDelegate) + delegateTaskStartTime = System.nanoTime() taskRunner.command(addPollTask) } } } + private def taskOnMessage(data: EnvelopeBuffer): Unit = { + countBeforeDelegate = 0 + flightRecorder.hiFreq(AeronSource_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime) + onMessage(data) + } + private def onMessage(data: EnvelopeBuffer): Unit = { flightRecorder.hiFreq(AeronSource_Received, data.byteBuffer.limit) push(out, data) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index b94fbb38a1..92fa84e5da 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -6,6 +6,7 @@ package akka.remote.artery import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit.MICROSECONDS import akka.remote.artery.compress.CompressionProtocol.CompressionMessage import scala.concurrent.Future @@ -415,25 +416,30 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R if (remoteSettings.AeronDirectoryName.nonEmpty) driverContext.aeronDirectoryName(remoteSettings.AeronDirectoryName) // FIXME settings from config - driverContext.conductorIdleStrategy() driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(20)) driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(20)) driverContext.driverTimeoutMs(SECONDS.toNanos(20)) - if (remoteSettings.IdleCpuLevel == 10) { + val idleCpuLevel = remoteSettings.IdleCpuLevel + if (idleCpuLevel == 10) { driverContext .threadingMode(ThreadingMode.DEDICATED) .conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1)) - .receiverIdleStrategy(new BusySpinIdleStrategy) - .senderIdleStrategy(new BusySpinIdleStrategy) - } else if (remoteSettings.IdleCpuLevel == 1) { + .receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + .senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + } else if (idleCpuLevel == 1) { driverContext .threadingMode(ThreadingMode.SHARED) - //FIXME measure: .sharedIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 200)) - } else if (remoteSettings.IdleCpuLevel <= 5) { + .sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + } else if (idleCpuLevel <= 7) { driverContext .threadingMode(ThreadingMode.SHARED_NETWORK) - //FIXME measure: .sharedNetworkIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 20 * (11 - remoteSettings.IdleCpuLevel))) + .sharedNetworkIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + } else { + driverContext + .threadingMode(ThreadingMode.DEDICATED) + .receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + .senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) } val driver = MediaDriver.launchEmbedded(driverContext) diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala index 93d034622c..4d1497025c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala @@ -28,12 +28,14 @@ object FlightRecorderEvents { val AeronSink_EnvelopeOffered = 18 val AeronSink_GaveUpEnvelope = 19 val AeronSink_DelegateToTaskRunner = 20 + val AeronSink_ReturnFromTaskRunner = 21 // Aeron Source events - val AeronSource_Started = 21 - val AeronSource_Stopped = 22 - val AeronSource_Received = 23 - val AeronSource_DelegateToTaskRunner = 24 + val AeronSource_Started = 22 + val AeronSource_Stopped = 23 + val AeronSource_Received = 24 + val AeronSource_DelegateToTaskRunner = 25 + val AeronSource_ReturnFromTaskRunner = 26 // Compression events val Compression_CompressedActorRef = 25 diff --git a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala index 6778260759..a8e1d67750 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala @@ -3,7 +3,7 @@ */ package akka.remote.artery -import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit.MICROSECONDS import scala.util.control.NonFatal import akka.actor.ExtendedActorSystem import akka.dispatch.AbstractNodeQueue @@ -80,12 +80,29 @@ private[akka] object TaskRunner { override def toString(): String = elements.filterNot(_ eq null).mkString("[", ",", "]") } + + def createIdleStrategy(idleCpuLevel: Int): IdleStrategy = { + if (idleCpuLevel == 1) { + val maxParkMicros = 400 + new BackoffIdleStrategy(100, 1, MICROSECONDS.toNanos(1), MICROSECONDS.toNanos(maxParkMicros)) + } else if (idleCpuLevel == 10) + new BusySpinIdleStrategy + else { + // spin between 100 to 10000 depending on idleCpuLevel + val spinning = 1100 * idleCpuLevel - 1000 + val yielding = 5 * idleCpuLevel + val minParkNanos = 1 + // park between 250 and 10 micros depending on idleCpuLevel + val maxParkNanos = MICROSECONDS.toNanos(280 - 30 * idleCpuLevel) + new BackoffIdleStrategy(spinning, yielding, 1, maxParkNanos) + } + } } /** * INTERNAL API */ -private[akka] class TaskRunner(system: ExtendedActorSystem, idleCpuLevel: Int) extends Runnable { +private[akka] class TaskRunner(system: ExtendedActorSystem, val idleCpuLevel: Int) extends Runnable { import TaskRunner._ private val log = Logging(system, getClass) @@ -93,21 +110,7 @@ private[akka] class TaskRunner(system: ExtendedActorSystem, idleCpuLevel: Int) e private[this] val cmdQueue = new CommandQueue private[this] val tasks = new ArrayBag[Task] - // TODO the backoff strategy should be measured and tuned - private val idleStrategy: IdleStrategy = { - if (idleCpuLevel == 1) { - val maxParkMicros = 400 - new BackoffIdleStrategy(1, 1, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(maxParkMicros)) - } else if (idleCpuLevel == 10) - new BusySpinIdleStrategy - else { - val spinning = 100000 * idleCpuLevel - val yielding = 2 * idleCpuLevel - val maxParkMicros = 40 * (11 - idleCpuLevel) - new BackoffIdleStrategy( - spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(maxParkMicros)) - } - } + private val idleStrategy = createIdleStrategy(idleCpuLevel) private var reset = false def start(): Unit = { @@ -132,8 +135,8 @@ private[akka] class TaskRunner(system: ExtendedActorSystem, idleCpuLevel: Int) e try { running = true while (running) { - executeTasks() processCommand(cmdQueue.poll()) + executeTasks() if (reset) { reset = false idleStrategy.reset() diff --git a/akka-remote/src/test/resources/aeron.properties b/akka-remote/src/test/resources/aeron.properties index db195e1075..007050a287 100644 --- a/akka-remote/src/test/resources/aeron.properties +++ b/akka-remote/src/test/resources/aeron.properties @@ -11,6 +11,11 @@ agrona.disable.bounds.checks=true aeron.threading.mode=SHARED_NETWORK +# low latency settings +#aeron.threading.mode=DEDICATED +#aeron.sender.idle.strategy=org.agrona.concurrent.BusySpinIdleStrategy +#aeron.receiver.idle.strategy=org.agrona.concurrent.BusySpinIdleStrategy + # use same director in akka.remote.artery.advanced.aeron-dir config # of the Akka application aeron.dir=target/aeron diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala deleted file mode 100644 index 6a8bf49089..0000000000 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.remote.artery - -import scala.concurrent.duration._ -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Source -import io.aeron.Aeron -import io.aeron.driver.MediaDriver -import java.util.concurrent.Executors -import scala.util.Success -import scala.util.Failure -import scala.concurrent.Future -import akka.Done -import org.HdrHistogram.Histogram -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.CountDownLatch -import java.util.concurrent.CyclicBarrier -import java.util.concurrent.atomic.AtomicLongArray -import akka.stream.ThrottleMode -import org.agrona.ErrorHandler -import io.aeron.AvailableImageHandler -import io.aeron.UnavailableImageHandler -import io.aeron.Image -import io.aeron.AvailableImageHandler -import akka.actor.ExtendedActorSystem -import java.io.File -import io.aeron.CncFileDescriptor - -object AeronStreamsApp { - - val channel1 = "aeron:udp?endpoint=localhost:40123" - val channel2 = "aeron:udp?endpoint=localhost:40124" - val streamId = 1 - val throughputN = 10000000 - val latencyRate = 10000 // per second - val latencyN = 10 * latencyRate - val payload = ("0" * 100).getBytes("utf-8") - val giveUpSendAfter = 60.seconds - lazy val sendTimes = new AtomicLongArray(latencyN) - - lazy val driver = { - val driverContext = new MediaDriver.Context - driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10)) - driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10)) - driverContext.driverTimeoutMs(SECONDS.toNanos(10)) - MediaDriver.launchEmbedded(driverContext) - } - - lazy val stats = { - new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))) - } - - lazy val aeron = { - val ctx = new Aeron.Context - ctx.errorHandler(new ErrorHandler { - override def onError(cause: Throwable) { - println(s"# Aeron onError " + cause) // FIXME - } - }) - ctx.availableImageHandler(new AvailableImageHandler { - override def onAvailableImage(img: Image): Unit = { - println(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}") - } - }) - ctx.unavailableImageHandler(new UnavailableImageHandler { - override def onUnavailableImage(img: Image): Unit = { - println(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}") - } - }) - - ctx.aeronDirectoryName(driver.aeronDirectoryName) - Aeron.connect(ctx) - } - - lazy val system = ActorSystem("AeronStreams") - lazy implicit val mat = ActorMaterializer()(system) - - val idleCpuLevel = 5 - lazy val taskRunner = { - val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel) - r.start() - r - } - - lazy val reporter = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { - override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { - println("%.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( - messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) - } - }) - lazy val reporterExecutor = Executors.newFixedThreadPool(1) - - lazy val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumFrameSize) - - def stopReporter(): Unit = { - reporter.halt() - reporterExecutor.shutdown() - } - - def exit(status: Int): Unit = { - stopReporter() - - system.scheduler.scheduleOnce(10.seconds) { - mat.shutdown() - system.terminate() - new Thread { - Thread.sleep(3000) - System.exit(status) - }.run() - }(system.dispatcher) - } - - lazy val histogram = new Histogram(SECONDS.toNanos(10), 3) - - def printTotal(total: Int, pre: String, startTime: Long, payloadSize: Long): Unit = { - val d = (System.nanoTime - startTime).nanos.toMillis - println(f"### $total $pre of size ${payloadSize} bytes took $d ms, " + - f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s") - - if (histogram.getTotalCount > 0) { - println("Histogram of RTT latencies in microseconds.") - histogram.outputPercentileDistribution(System.out, 1000.0) - } - } - - def main(args: Array[String]): Unit = { - - // receiver of plain throughput testing - if (args.length == 0 || args(0) == "receiver") - runReceiver() - - // sender of plain throughput testing - if (args.length == 0 || args(0) == "sender") - runSender() - - // sender of ping-pong latency testing - if (args.length != 0 && args(0) == "echo-sender") - runEchoSender() - - // echo receiver of ping-pong latency testing - if (args.length != 0 && args(0) == "echo-receiver") - runEchoReceiver() - - if (args.length != 0 && args(0) == "debug-receiver") - runDebugReceiver() - - if (args.length != 0 && args(0) == "debug-sender") - runDebugSender() - - if (args.length >= 2 && args(1) == "stats") - runStats() - } - - def runReceiver(): Unit = { - import system.dispatcher - reporterExecutor.execute(reporter) - val r = reporter - var t0 = System.nanoTime() - var count = 0L - var payloadSize = 0L - Source.fromGraph(new AeronSource(channel1, streamId, aeron, taskRunner, pool, IgnoreEventSink)) - .map { envelope ⇒ - r.onMessage(1, envelope.byteBuffer.limit) - envelope - } - .runForeach { envelope ⇒ - count += 1 - if (count == 1) { - t0 = System.nanoTime() - payloadSize = envelope.byteBuffer.limit - } else if (count == throughputN) { - exit(0) - printTotal(throughputN, "receive", t0, payloadSize) - } - pool.release(envelope) - }.onFailure { - case e ⇒ - e.printStackTrace - exit(-1) - } - - } - - def runSender(): Unit = { - reporterExecutor.execute(reporter) - val r = reporter - val t0 = System.nanoTime() - Source(1 to throughputN) - .map { n ⇒ - if (n == throughputN) { - exit(0) - printTotal(throughputN, "send", t0, payload.length) - } - n - } - .map { _ ⇒ - r.onMessage(1, payload.length) - val envelope = pool.acquire() - envelope.byteBuffer.put(payload) - envelope.byteBuffer.flip() - envelope - } - .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) - } - - def runEchoReceiver(): Unit = { - // just echo back on channel2 - reporterExecutor.execute(reporter) - val r = reporter - Source.fromGraph(new AeronSource(channel1, streamId, aeron, taskRunner, pool, IgnoreEventSink)) - .map { envelope ⇒ - r.onMessage(1, envelope.byteBuffer.limit) - envelope - } - .runWith(new AeronSink(channel2, streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) - } - - def runEchoSender(): Unit = { - import system.dispatcher - reporterExecutor.execute(reporter) - val r = reporter - - val barrier = new CyclicBarrier(2) - var repeat = 3 - val count = new AtomicInteger - var t0 = System.nanoTime() - Source.fromGraph(new AeronSource(channel2, streamId, aeron, taskRunner, pool, IgnoreEventSink)) - .map { envelope ⇒ - r.onMessage(1, envelope.byteBuffer.limit) - envelope - } - .runForeach { envelope ⇒ - val c = count.incrementAndGet() - val d = System.nanoTime() - sendTimes.get(c - 1) - if (c % (latencyN / 10) == 0) - println(s"# receive offset $c => ${d / 1000} µs") // FIXME - histogram.recordValue(d) - if (c == latencyN) { - printTotal(latencyN, "ping-pong", t0, envelope.byteBuffer.limit) - barrier.await() // this is always the last party - } - pool.release(envelope) - }.onFailure { - case e ⇒ - e.printStackTrace - exit(-1) - } - - while (repeat > 0) { - repeat -= 1 - histogram.reset() - count.set(0) - t0 = System.nanoTime() - - Source(1 to latencyN) - .throttle(latencyRate, 1.second, latencyRate / 10, ThrottleMode.Shaping) - .map { n ⇒ - if (n % (latencyN / 10) == 0) - println(s"# send offset $n") // FIXME - sendTimes.set(n - 1, System.nanoTime()) - val envelope = pool.acquire() - envelope.byteBuffer.put(payload) - envelope.byteBuffer.flip() - envelope - } - .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) - - barrier.await() - } - - exit(0) - } - - def runDebugReceiver(): Unit = { - import system.dispatcher - Source.fromGraph(new AeronSource(channel1, streamId, aeron, taskRunner, pool, IgnoreEventSink)) - .map { envelope ⇒ - val bytes = Array.ofDim[Byte](envelope.byteBuffer.limit) - envelope.byteBuffer.get(bytes) - pool.release(envelope) - new String(bytes, "utf-8") - } - .runForeach { s ⇒ - println(s) - }.onFailure { - case e ⇒ - e.printStackTrace - exit(-1) - } - - } - - def runDebugSender(): Unit = { - val fill = "0000" - Source(1 to 1000) - .throttle(1, 1.second, 1, ThrottleMode.Shaping) - .map { n ⇒ - val s = (fill + n.toString).takeRight(4) - println(s) - val envelope = pool.acquire() - envelope.byteBuffer.put(s.getBytes("utf-8")) - envelope.byteBuffer.flip() - envelope - } - .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) - } - - def runStats(): Unit = { - Source.tick(10.second, 10.second, "tick").runForeach { _ ⇒ stats.print(System.out) } - } - -}