first prototype of Aeron Source & Sink

* and test program to meassure latency and throughput
This commit is contained in:
Patrik Nordwall 2016-04-08 10:41:32 +02:00
parent 78b88c419d
commit a033d52b37
5 changed files with 532 additions and 2 deletions

View file

@ -0,0 +1,115 @@
/*
* Copyright 2014 - 2016 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.aeron;
import java.util.concurrent.locks.LockSupport;
/**
* Tracker and reporter of rates.
*
* Uses volatile semantics for counters.
*/
public class RateReporter implements Runnable
{
/**
* Interface for reporting of rate information
*/
@FunctionalInterface
public interface Reporter
{
/**
* Called for a rate report.
*
* @param messagesPerSec since last report
* @param bytesPerSec since last report
* @param totalMessages since beginning of reporting
* @param totalBytes since beginning of reporting
*/
void onReport(double messagesPerSec, double bytesPerSec, long totalMessages, long totalBytes);
}
private final long reportIntervalNs;
private final long parkNs;
private final Reporter reportingFunc;
private volatile boolean halt = false;
private volatile long totalBytes;
private volatile long totalMessages;
private long lastTotalBytes;
private long lastTotalMessages;
private long lastTimestamp;
/**
* Create a rate reporter with the given report interval in nanoseconds and the reporting function.
*
* @param reportInterval in nanoseconds
* @param reportingFunc to call for reporting rates
*/
public RateReporter(final long reportInterval, final Reporter reportingFunc)
{
this.reportIntervalNs = reportInterval;
this.parkNs = reportInterval;
this.reportingFunc = reportingFunc;
lastTimestamp = System.nanoTime();
}
/**
* Run loop for the rate reporter
*/
@Override
public void run()
{
do
{
LockSupport.parkNanos(parkNs);
final long currentTotalMessages = totalMessages;
final long currentTotalBytes = totalBytes;
final long currentTimestamp = System.nanoTime();
final long timeSpanNs = currentTimestamp - lastTimestamp;
final double messagesPerSec = ((currentTotalMessages - lastTotalMessages) * reportIntervalNs) / (double)timeSpanNs;
final double bytesPerSec = ((currentTotalBytes - lastTotalBytes) * reportIntervalNs) / (double)timeSpanNs;
reportingFunc.onReport(messagesPerSec, bytesPerSec, currentTotalMessages, currentTotalBytes);
lastTotalBytes = currentTotalBytes;
lastTotalMessages = currentTotalMessages;
lastTimestamp = currentTimestamp;
}
while (!halt);
}
/**
* Signal the run loop to exit. Does not block.
*/
public void halt()
{
halt = true;
}
/**
* Tell rate reporter of number of messages and bytes received, sent, etc.
*
* @param messages received, sent, etc.
* @param bytes received, sent, etc.
*/
public void onMessage(final long messages, final long bytes)
{
totalBytes += bytes;
totalMessages += messages;
}
}

View file

@ -0,0 +1,97 @@
package akka.aeron
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.concurrent.duration._
import akka.stream.Attributes
import akka.stream.Inlet
import akka.stream.SinkShape
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.TimerGraphStageLogic
import io.aeron.Aeron
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.UnsafeBuffer
object AeronSink {
type Bytes = Array[Byte]
private case object Backoff
}
/**
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
*/
class AeronSink(channel: String, aeron: () => Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] {
import AeronSink._
val in: Inlet[Bytes] = Inlet("AeronSink")
override val shape: SinkShape[Bytes] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler {
private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(128 * 1024))
private val streamId = 10
private val pub = aeron().addPublication(channel, streamId)
private val idleStrategy = new BackoffIdleStrategy(
100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100))
private val retries = 120
private var backoffCount = retries
private var lastMsgSize = 0
override def preStart(): Unit = pull(in)
override def postStop(): Unit = {
pub.close()
}
// InHandler
override def onPush(): Unit = {
val msg = grab(in)
buffer.putBytes(0, msg);
idleStrategy.reset()
backoffCount = retries
lastMsgSize = msg.length
publish()
}
@tailrec private def publish(): Unit = {
val result = pub.offer(buffer, 0, lastMsgSize)
// FIXME handle Publication.CLOSED
// TODO the backoff strategy should be measured and tuned
if (result < 0) {
if (backoffCount == 1) {
println(s"# drop") // FIXME
pull(in) // drop it
} else if (backoffCount <= 5) {
// println(s"# scheduled backoff ${6 - backoffCount}") // FIXME
backoffCount -= 1
if (backoffCount <= 2)
scheduleOnce(Backoff, 50.millis)
else
scheduleOnce(Backoff, 1.millis)
} else {
idleStrategy.idle()
backoffCount -= 1
publish() // recursive
}
} else {
pull(in)
}
}
override protected def onTimer(timerKey: Any): Unit = {
timerKey match {
case Backoff => publish()
case msg => super.onTimer(msg)
}
}
setHandler(in, this)
}
}

View file

@ -0,0 +1,103 @@
package akka.aeron
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.tailrec
import scala.concurrent.duration._
import akka.stream.Attributes
import akka.stream.Outlet
import akka.stream.SourceShape
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import io.aeron.Aeron
import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import org.agrona.DirectBuffer
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.UnsafeBuffer
object AeronSource {
type Bytes = Array[Byte]
private case object Backoff
}
/**
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
*/
class AeronSource(channel: String, aeron: () => Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] {
import AeronSource._
val out: Outlet[Bytes] = Outlet("AeronSource")
override val shape: SourceShape[Bytes] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with OutHandler {
private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(256))
private val streamId = 10
private val sub = aeron().addSubscription(channel, streamId)
private val running = new AtomicBoolean(true)
private val idleStrategy = new BackoffIdleStrategy(
100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100))
private val retries = 115
private var backoffCount = retries
val receiveMessage = getAsyncCallback[Bytes] { data =>
push(out, data)
}
val fragmentHandler: FragmentHandler = new FragmentHandler {
override def onFragment(buffer: DirectBuffer, offset: Int, length: Int, header: Header): Unit = {
val data = Array.ofDim[Byte](length)
buffer.getBytes(offset, data);
receiveMessage.invoke(data)
}
}
override def postStop(): Unit = {
running.set(false)
sub.close()
}
// OutHandler
override def onPull(): Unit = {
idleStrategy.reset()
backoffCount = retries
subscriberLoop()
}
@tailrec private def subscriberLoop(): Unit =
if (running.get) {
val fragmentsRead = sub.poll(fragmentHandler, 1)
if (fragmentsRead <= 0) {
// TODO the backoff strategy should be measured and tuned
if (backoffCount <= 0) {
// println(s"# scheduled backoff ${0 - backoffCount + 1}") // FIXME
backoffCount -= 1
if (backoffCount <= -5)
scheduleOnce(Backoff, 50.millis)
else
scheduleOnce(Backoff, 1.millis)
} else {
idleStrategy.idle()
backoffCount -= 1
subscriberLoop() // recursive
}
}
}
override protected def onTimer(timerKey: Any): Unit = {
timerKey match {
case Backoff => subscriberLoop()
case msg => super.onTimer(msg)
}
}
setHandler(out, this)
}
}

View file

@ -0,0 +1,210 @@
package akka.aeron
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import java.util.concurrent.Executors
import scala.util.Success
import scala.util.Failure
import scala.concurrent.Future
import akka.Done
import org.HdrHistogram.Histogram
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.AtomicLongArray
import akka.stream.ThrottleMode
object AeronStreams {
val channel1 = "aeron:udp?endpoint=localhost:40123"
val channel2 = "aeron:udp?endpoint=localhost:40124"
val throughputN = 10000000
val latencyN = 100000
val payload = ("0" * 100).getBytes("utf-8")
lazy val sendTimes = new AtomicLongArray(latencyN)
lazy val aeron = {
val ctx = new Aeron.Context
val driver = MediaDriver.launchEmbedded()
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
lazy val system = ActorSystem("AeronStreams")
lazy implicit val mat = ActorMaterializer()(system)
lazy val reporter = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter {
override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = {
println("%.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format(
messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024)))
}
})
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def stopReporter(): Unit = {
reporter.halt()
reporterExecutor.shutdown()
}
def exit(status: Int): Unit = {
stopReporter()
system.scheduler.scheduleOnce(10.seconds) {
mat.shutdown()
system.terminate()
new Thread {
Thread.sleep(3000)
System.exit(status)
}.run()
}(system.dispatcher)
}
lazy val histogram = new Histogram(SECONDS.toNanos(10), 3)
def printTotal(total: Int, pre: String, startTime: Long, payloadSize: Long): Unit = {
val d = (System.nanoTime - startTime).nanos.toMillis
println(f"### $total $pre of size ${payloadSize} bytes took $d ms, " +
f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s")
if (histogram.getTotalCount > 0) {
println("Histogram of RTT latencies in microseconds.")
histogram.outputPercentileDistribution(System.out, 1000.0)
}
}
def main(args: Array[String]): Unit = {
// receiver of plain throughput testing
if (args.length == 0 || args(0) == "receiver")
runReceiver()
// sender of plain throughput testing
if (args.length == 0 || args(0) == "sender")
runSender()
// sender of ping-pong latency testing
if (args.length != 0 && args(0) == "echo-sender")
runEchoSender()
// echo receiver of ping-pong latency testing
if (args.length != 0 && args(0) == "echo-receiver")
runEchoReceiver()
}
def runReceiver(): Unit = {
import system.dispatcher
reporterExecutor.execute(reporter)
val r = reporter
var t0 = System.nanoTime()
var count = 0L
var payloadSize = 0L
Source.fromGraph(new AeronSource(channel1, () => aeron))
.map { bytes =>
r.onMessage(1, bytes.length)
bytes
}
.runForeach { bytes =>
count += 1
if (count == 1) {
t0 = System.nanoTime()
payloadSize = bytes.length
} else if (count == throughputN) {
exit(0)
printTotal(throughputN, "receive", t0, payloadSize)
}
}.onFailure {
case e =>
e.printStackTrace
exit(-1)
}
}
def runSender(): Unit = {
reporterExecutor.execute(reporter)
val r = reporter
val t0 = System.nanoTime()
Source(1 to throughputN)
.map { n =>
if (n == throughputN) {
exit(0)
printTotal(throughputN, "send", t0, payload.length)
}
n
}
.map { _ =>
r.onMessage(1, payload.length)
payload
}
.runWith(new AeronSink(channel1, () => aeron))
}
def runEchoReceiver(): Unit = {
// just echo back on channel2
reporterExecutor.execute(reporter)
val r = reporter
Source.fromGraph(new AeronSource(channel1, () => aeron))
.map { bytes =>
r.onMessage(1, bytes.length)
bytes
}
.runWith(new AeronSink(channel2, () => aeron))
}
def runEchoSender(): Unit = {
import system.dispatcher
reporterExecutor.execute(reporter)
val r = reporter
val barrier = new CyclicBarrier(2)
var repeat = 3
val count = new AtomicInteger
var t0 = System.nanoTime()
Source.fromGraph(new AeronSource(channel2, () => aeron))
.map { bytes =>
r.onMessage(1, bytes.length)
bytes
}
.runForeach { bytes =>
val c = count.incrementAndGet()
val d = System.nanoTime() - sendTimes.get(c - 1)
if (c % 10000 == 0)
println(s"# receive offset $c => ${d / 1000} µs") // FIXME
histogram.recordValue(d)
if (c == latencyN) {
printTotal(latencyN, "ping-pong", t0, bytes.length)
barrier.await() // this is always the last party
}
}.onFailure {
case e =>
e.printStackTrace
exit(-1)
}
while (repeat > 0) {
repeat -= 1
histogram.reset()
count.set(0)
t0 = System.nanoTime()
Source(1 to latencyN)
.throttle(10000, 1.second, 100000, ThrottleMode.Shaping)
.map { n =>
if (n % 10000 == 0)
println(s"# send offset $n") // FIXME
sendTimes.set(n - 1, System.nanoTime())
payload
}
.runWith(new AeronSink(channel1, () => aeron))
barrier.await()
}
exit(0)
}
}

View file

@ -66,6 +66,10 @@ object Dependencies {
// For Java 8 Conversions // For Java 8 Conversions
val java8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.7.0" // Scala License val java8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.7.0" // Scala License
val aeronDriver = "io.aeron" % "aeron-driver" % "0.9.5" // ApacheV2
val aeronClient = "io.aeron" % "aeron-client" % "0.9.5" // ApacheV2
val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" // CC0
object Docs { object Docs {
val sprayJson = "io.spray" %% "spray-json" % "1.3.2" % "test" val sprayJson = "io.spray" %% "spray-json" % "1.3.2" % "test"
@ -92,7 +96,8 @@ object Dependencies {
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2 val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2
val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.2" % "test" // ApacheV2 val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.2" % "test" // ApacheV2
val latencyUtils = "org.latencyutils" % "LatencyUtils" % "1.0.3" % "test" // Free BSD val latencyUtils = "org.latencyutils" % "LatencyUtils" % "1.0.3" % "test" // Free BSD
val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.1.4" % "test" // CC0 val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" % "test" // CC0
val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram) val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram)
// sigar logging // sigar logging
@ -161,7 +166,7 @@ object Dependencies {
val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative) val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, aeronDriver, aeronClient, hdrHistogram)
// akka stream & http // akka stream & http