diff --git a/akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java b/akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java new file mode 100644 index 0000000000..e042e58bab --- /dev/null +++ b/akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java @@ -0,0 +1,115 @@ +/* + * Copyright 2014 - 2016 Real Logic Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package akka.aeron; + +import java.util.concurrent.locks.LockSupport; + +/** + * Tracker and reporter of rates. + * + * Uses volatile semantics for counters. + */ +public class RateReporter implements Runnable +{ + /** + * Interface for reporting of rate information + */ + @FunctionalInterface + public interface Reporter + { + /** + * Called for a rate report. + * + * @param messagesPerSec since last report + * @param bytesPerSec since last report + * @param totalMessages since beginning of reporting + * @param totalBytes since beginning of reporting + */ + void onReport(double messagesPerSec, double bytesPerSec, long totalMessages, long totalBytes); + } + + private final long reportIntervalNs; + private final long parkNs; + private final Reporter reportingFunc; + + private volatile boolean halt = false; + private volatile long totalBytes; + private volatile long totalMessages; + private long lastTotalBytes; + private long lastTotalMessages; + private long lastTimestamp; + + /** + * Create a rate reporter with the given report interval in nanoseconds and the reporting function. + * + * @param reportInterval in nanoseconds + * @param reportingFunc to call for reporting rates + */ + public RateReporter(final long reportInterval, final Reporter reportingFunc) + { + this.reportIntervalNs = reportInterval; + this.parkNs = reportInterval; + this.reportingFunc = reportingFunc; + lastTimestamp = System.nanoTime(); + } + + /** + * Run loop for the rate reporter + */ + @Override + public void run() + { + do + { + LockSupport.parkNanos(parkNs); + + final long currentTotalMessages = totalMessages; + final long currentTotalBytes = totalBytes; + final long currentTimestamp = System.nanoTime(); + + final long timeSpanNs = currentTimestamp - lastTimestamp; + final double messagesPerSec = ((currentTotalMessages - lastTotalMessages) * reportIntervalNs) / (double)timeSpanNs; + final double bytesPerSec = ((currentTotalBytes - lastTotalBytes) * reportIntervalNs) / (double)timeSpanNs; + + reportingFunc.onReport(messagesPerSec, bytesPerSec, currentTotalMessages, currentTotalBytes); + + lastTotalBytes = currentTotalBytes; + lastTotalMessages = currentTotalMessages; + lastTimestamp = currentTimestamp; + } + while (!halt); + } + + /** + * Signal the run loop to exit. Does not block. + */ + public void halt() + { + halt = true; + } + + /** + * Tell rate reporter of number of messages and bytes received, sent, etc. + * + * @param messages received, sent, etc. + * @param bytes received, sent, etc. + */ + public void onMessage(final long messages, final long bytes) + { + totalBytes += bytes; + totalMessages += messages; + } +} \ No newline at end of file diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala b/akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala new file mode 100644 index 0000000000..e446fed180 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/aeron/AeronSink.scala @@ -0,0 +1,97 @@ +package akka.aeron + +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit + +import scala.annotation.tailrec +import scala.concurrent.duration._ + +import akka.stream.Attributes +import akka.stream.Inlet +import akka.stream.SinkShape +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.InHandler +import akka.stream.stage.TimerGraphStageLogic +import io.aeron.Aeron +import org.agrona.concurrent.BackoffIdleStrategy +import org.agrona.concurrent.UnsafeBuffer + +object AeronSink { + type Bytes = Array[Byte] + private case object Backoff +} + +/** + * @param channel eg. "aeron:udp?endpoint=localhost:40123" + */ +class AeronSink(channel: String, aeron: () => Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] { + import AeronSink._ + + val in: Inlet[Bytes] = Inlet("AeronSink") + override val shape: SinkShape[Bytes] = SinkShape(in) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler { + + private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(128 * 1024)) + private val streamId = 10 + private val pub = aeron().addPublication(channel, streamId) + private val idleStrategy = new BackoffIdleStrategy( + 100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100)) + private val retries = 120 + + private var backoffCount = retries + private var lastMsgSize = 0 + + override def preStart(): Unit = pull(in) + + override def postStop(): Unit = { + pub.close() + } + + // InHandler + override def onPush(): Unit = { + val msg = grab(in) + buffer.putBytes(0, msg); + idleStrategy.reset() + backoffCount = retries + lastMsgSize = msg.length + publish() + } + + @tailrec private def publish(): Unit = { + val result = pub.offer(buffer, 0, lastMsgSize) + // FIXME handle Publication.CLOSED + // TODO the backoff strategy should be measured and tuned + if (result < 0) { + if (backoffCount == 1) { + println(s"# drop") // FIXME + pull(in) // drop it + } else if (backoffCount <= 5) { + // println(s"# scheduled backoff ${6 - backoffCount}") // FIXME + backoffCount -= 1 + if (backoffCount <= 2) + scheduleOnce(Backoff, 50.millis) + else + scheduleOnce(Backoff, 1.millis) + } else { + idleStrategy.idle() + backoffCount -= 1 + publish() // recursive + } + } else { + pull(in) + } + } + + override protected def onTimer(timerKey: Any): Unit = { + timerKey match { + case Backoff => publish() + case msg => super.onTimer(msg) + } + } + + setHandler(in, this) + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala b/akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala new file mode 100644 index 0000000000..02bd73c9fb --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/aeron/AeronSource.scala @@ -0,0 +1,103 @@ +package akka.aeron + +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +import scala.annotation.tailrec +import scala.concurrent.duration._ + +import akka.stream.Attributes +import akka.stream.Outlet +import akka.stream.SourceShape +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.OutHandler +import akka.stream.stage.TimerGraphStageLogic +import io.aeron.Aeron +import io.aeron.logbuffer.FragmentHandler +import io.aeron.logbuffer.Header +import org.agrona.DirectBuffer +import org.agrona.concurrent.BackoffIdleStrategy +import org.agrona.concurrent.UnsafeBuffer + +object AeronSource { + type Bytes = Array[Byte] + private case object Backoff +} + +/** + * @param channel eg. "aeron:udp?endpoint=localhost:40123" + */ +class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] { + import AeronSource._ + + val out: Outlet[Bytes] = Outlet("AeronSource") + override val shape: SourceShape[Bytes] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with OutHandler { + + private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(256)) + private val streamId = 10 + private val sub = aeron().addSubscription(channel, streamId) + private val running = new AtomicBoolean(true) + private val idleStrategy = new BackoffIdleStrategy( + 100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100)) + private val retries = 115 + private var backoffCount = retries + + val receiveMessage = getAsyncCallback[Bytes] { data => + push(out, data) + } + + val fragmentHandler: FragmentHandler = new FragmentHandler { + override def onFragment(buffer: DirectBuffer, offset: Int, length: Int, header: Header): Unit = { + val data = Array.ofDim[Byte](length) + buffer.getBytes(offset, data); + receiveMessage.invoke(data) + } + } + + override def postStop(): Unit = { + running.set(false) + sub.close() + } + + // OutHandler + override def onPull(): Unit = { + idleStrategy.reset() + backoffCount = retries + subscriberLoop() + } + + @tailrec private def subscriberLoop(): Unit = + if (running.get) { + val fragmentsRead = sub.poll(fragmentHandler, 1) + if (fragmentsRead <= 0) { + // TODO the backoff strategy should be measured and tuned + if (backoffCount <= 0) { + // println(s"# scheduled backoff ${0 - backoffCount + 1}") // FIXME + backoffCount -= 1 + if (backoffCount <= -5) + scheduleOnce(Backoff, 50.millis) + else + scheduleOnce(Backoff, 1.millis) + } else { + idleStrategy.idle() + backoffCount -= 1 + subscriberLoop() // recursive + } + } + } + + override protected def onTimer(timerKey: Any): Unit = { + timerKey match { + case Backoff => subscriberLoop() + case msg => super.onTimer(msg) + } + } + + setHandler(out, this) + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala b/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala new file mode 100644 index 0000000000..79a1f41b47 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala @@ -0,0 +1,210 @@ +package akka.aeron + +import scala.concurrent.duration._ +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Source +import io.aeron.Aeron +import io.aeron.driver.MediaDriver +import java.util.concurrent.Executors +import scala.util.Success +import scala.util.Failure +import scala.concurrent.Future +import akka.Done +import org.HdrHistogram.Histogram +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.CountDownLatch +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.atomic.AtomicLongArray +import akka.stream.ThrottleMode + +object AeronStreams { + + val channel1 = "aeron:udp?endpoint=localhost:40123" + val channel2 = "aeron:udp?endpoint=localhost:40124" + val throughputN = 10000000 + val latencyN = 100000 + val payload = ("0" * 100).getBytes("utf-8") + lazy val sendTimes = new AtomicLongArray(latencyN) + + lazy val aeron = { + val ctx = new Aeron.Context + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + lazy val system = ActorSystem("AeronStreams") + lazy implicit val mat = ActorMaterializer()(system) + + lazy val reporter = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println("%.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + + def stopReporter(): Unit = { + reporter.halt() + reporterExecutor.shutdown() + } + + def exit(status: Int): Unit = { + stopReporter() + + system.scheduler.scheduleOnce(10.seconds) { + mat.shutdown() + system.terminate() + new Thread { + Thread.sleep(3000) + System.exit(status) + }.run() + }(system.dispatcher) + } + + lazy val histogram = new Histogram(SECONDS.toNanos(10), 3) + + def printTotal(total: Int, pre: String, startTime: Long, payloadSize: Long): Unit = { + val d = (System.nanoTime - startTime).nanos.toMillis + println(f"### $total $pre of size ${payloadSize} bytes took $d ms, " + + f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s") + + if (histogram.getTotalCount > 0) { + println("Histogram of RTT latencies in microseconds.") + histogram.outputPercentileDistribution(System.out, 1000.0) + } + } + + def main(args: Array[String]): Unit = { + + // receiver of plain throughput testing + if (args.length == 0 || args(0) == "receiver") + runReceiver() + + // sender of plain throughput testing + if (args.length == 0 || args(0) == "sender") + runSender() + + // sender of ping-pong latency testing + if (args.length != 0 && args(0) == "echo-sender") + runEchoSender() + + // echo receiver of ping-pong latency testing + if (args.length != 0 && args(0) == "echo-receiver") + runEchoReceiver() + } + + def runReceiver(): Unit = { + import system.dispatcher + reporterExecutor.execute(reporter) + val r = reporter + var t0 = System.nanoTime() + var count = 0L + var payloadSize = 0L + Source.fromGraph(new AeronSource(channel1, () => aeron)) + .map { bytes => + r.onMessage(1, bytes.length) + bytes + } + .runForeach { bytes => + count += 1 + if (count == 1) { + t0 = System.nanoTime() + payloadSize = bytes.length + } else if (count == throughputN) { + exit(0) + printTotal(throughputN, "receive", t0, payloadSize) + } + }.onFailure { + case e => + e.printStackTrace + exit(-1) + } + + } + + def runSender(): Unit = { + reporterExecutor.execute(reporter) + val r = reporter + val t0 = System.nanoTime() + Source(1 to throughputN) + .map { n => + if (n == throughputN) { + exit(0) + printTotal(throughputN, "send", t0, payload.length) + } + n + } + .map { _ => + r.onMessage(1, payload.length) + payload + } + .runWith(new AeronSink(channel1, () => aeron)) + } + + def runEchoReceiver(): Unit = { + // just echo back on channel2 + reporterExecutor.execute(reporter) + val r = reporter + Source.fromGraph(new AeronSource(channel1, () => aeron)) + .map { bytes => + r.onMessage(1, bytes.length) + bytes + } + .runWith(new AeronSink(channel2, () => aeron)) + } + + def runEchoSender(): Unit = { + import system.dispatcher + reporterExecutor.execute(reporter) + val r = reporter + + val barrier = new CyclicBarrier(2) + var repeat = 3 + val count = new AtomicInteger + var t0 = System.nanoTime() + Source.fromGraph(new AeronSource(channel2, () => aeron)) + .map { bytes => + r.onMessage(1, bytes.length) + bytes + } + .runForeach { bytes => + val c = count.incrementAndGet() + val d = System.nanoTime() - sendTimes.get(c - 1) + if (c % 10000 == 0) + println(s"# receive offset $c => ${d / 1000} µs") // FIXME + histogram.recordValue(d) + if (c == latencyN) { + printTotal(latencyN, "ping-pong", t0, bytes.length) + barrier.await() // this is always the last party + } + }.onFailure { + case e => + e.printStackTrace + exit(-1) + } + + while (repeat > 0) { + repeat -= 1 + histogram.reset() + count.set(0) + t0 = System.nanoTime() + + Source(1 to latencyN) + .throttle(10000, 1.second, 100000, ThrottleMode.Shaping) + .map { n => + if (n % 10000 == 0) + println(s"# send offset $n") // FIXME + sendTimes.set(n - 1, System.nanoTime()) + payload + } + .runWith(new AeronSink(channel1, () => aeron)) + + barrier.await() + } + + exit(0) + } + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6424fcd3c0..1a714dee6b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -66,6 +66,10 @@ object Dependencies { // For Java 8 Conversions val java8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.7.0" // Scala License + + val aeronDriver = "io.aeron" % "aeron-driver" % "0.9.5" // ApacheV2 + val aeronClient = "io.aeron" % "aeron-client" % "0.9.5" // ApacheV2 + val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" // CC0 object Docs { val sprayJson = "io.spray" %% "spray-json" % "1.3.2" % "test" @@ -92,7 +96,8 @@ object Dependencies { val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2 val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.2" % "test" // ApacheV2 val latencyUtils = "org.latencyutils" % "LatencyUtils" % "1.0.3" % "test" // Free BSD - val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.1.4" % "test" // CC0 + val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" % "test" // CC0 + val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram) // sigar logging @@ -161,7 +166,7 @@ object Dependencies { val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) - val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative) + val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, aeronDriver, aeronClient, hdrHistogram) // akka stream & http