diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index aae55a1db7..b8969bfc6a 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -196,6 +196,7 @@ object MultiNodeSpec { private[testkit] val nodeConfig = mapToConfig(Map( "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", + "akka.remote.artery.hostname" -> selfName, "akka.remote.netty.tcp.hostname" -> selfName, "akka.remote.netty.tcp.port" -> selfPort)) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala new file mode 100644 index 0000000000..c6cd4a7fe1 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -0,0 +1,212 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLongArray + +import scala.concurrent.duration._ + +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.stream.ActorMaterializer +import akka.stream.ThrottleMode +import akka.stream.scaladsl.Source +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import io.aeron.Aeron +import io.aeron.driver.MediaDriver +import org.HdrHistogram.Histogram + +object AeronStreamLatencySpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + val barrierTimeout = 5.minutes + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3) + akka.test.AeronStreamLatencySpec.totalMessagesFactor = 1.0 + akka.test.AeronStreamLatencySpec.repeatCount = 1 + akka { + loglevel = ERROR + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor { + provider = "akka.remote.RemoteActorRefProvider" + serialize-creators = false + serialize-messages = false + } + remote.artery.enabled = off + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20521 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20522 + } + + final case class TestSettings( + testName: String, + messageRate: Int, // msg/s + payloadSize: Int, + repeat: Int) + +} + +class AeronStreamLatencySpecMultiJvmNode1 extends AeronStreamLatencySpec +class AeronStreamLatencySpecMultiJvmNode2 extends AeronStreamLatencySpec + +abstract class AeronStreamLatencySpec + extends MultiNodeSpec(AeronStreamLatencySpec) + with STMultiNodeSpec with ImplicitSender { + + import AeronStreamLatencySpec._ + + val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamLatencySpec.totalMessagesFactor") + val repeatCount = system.settings.config.getInt("akka.test.AeronStreamLatencySpec.repeatCount") + + val aeron = { + val ctx = new Aeron.Context + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + lazy implicit val mat = ActorMaterializer()(system) + import system.dispatcher + + override def initialParticipants = roles.size + + def channel(roleName: RoleName) = { + val a = node(roleName).address + s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}" + } + + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + def reporter(name: String): RateReporter = { + val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + reporterExecutor.execute(r) + r + } + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + super.afterAll() + } + + def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = { + import scala.collection.JavaConverters._ + val percentiles = histogram.percentiles(5) + def percentile(p: Double): Double = + percentiles.iterator().asScala.collectFirst { + case value if (p - 0.5) < value.getPercentileLevelIteratedTo && + value.getPercentileLevelIteratedTo < (p + 0.5) ⇒ value.getValueIteratedTo / 1000.0 + }.getOrElse(Double.NaN) + + println(s"=== AeronStreamLatency $testName: RTT " + + f"50%%ile: ${percentile(50.0)}%.0f µs, " + + f"90%%ile: ${percentile(90.0)}%.0f µs, " + + f"99%%ile: ${percentile(99.0)}%.0f µs, ") + println("Histogram of RTT latencies in microseconds.") + histogram.outputPercentileDistribution(System.out, 1000.0) + } + + val scenarios = List( + TestSettings( + testName = "rate-100-size-100", + messageRate = 100, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-1000-size-100", + messageRate = 1000, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-10000-size-100", + messageRate = 10000, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-1000-size-1k", + messageRate = 1000, + payloadSize = 1000, + repeat = repeatCount)) + + def test(testSettings: TestSettings): Unit = { + import testSettings._ + + runOn(first) { + val payload = ("0" * payloadSize).getBytes("utf-8") + // by default run for 2 seconds, but can be adjusted with the totalMessagesFactor + val totalMessages = (2 * messageRate * totalMessagesFactor).toInt + val sendTimes = new AtomicLongArray(totalMessages) + val histogram = new Histogram(SECONDS.toNanos(10), 3) + + val rep = reporter(testName) + val barrier = new CyclicBarrier(2) + val count = new AtomicInteger + Source.fromGraph(new AeronSource(channel(first), aeron)) + .runForeach { bytes ⇒ + if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") + rep.onMessage(1, payloadSize) + val c = count.incrementAndGet() + val d = System.nanoTime() - sendTimes.get(c - 1) + histogram.recordValue(d) + if (c == totalMessages) { + printTotal(testName, bytes.length, histogram) + barrier.await() // this is always the last party + } + } + + for (n ← 1 to repeat) { + histogram.reset() + count.set(0) + + Source(1 to totalMessages) + .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) + .map { n ⇒ + sendTimes.set(n - 1, System.nanoTime()) + payload + } + .runWith(new AeronSink(channel(second), aeron)) + + barrier.await((totalMessages / messageRate) + 10, SECONDS) + } + + rep.halt() + } + + enterBarrier("after-" + testName) + } + + "Latency of Aeron Streams" must { + + "start echo" in { + runOn(second) { + // just echo back + Source.fromGraph(new AeronSource(channel(second), aeron)) + .runWith(new AeronSink(channel(first), aeron)) + } + enterBarrier("echo-started") + } + + for (s ← scenarios) { + s"be low for ${s.testName}, at ${s.messageRate} msg/s, payloadSize = ${s.payloadSize}" in test(s) + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala new file mode 100644 index 0000000000..22b42bbbd0 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -0,0 +1,194 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.net.InetAddress +import java.util.concurrent.Executors + +import scala.collection.AbstractIterator +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Source +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +import io.aeron.Aeron +import io.aeron.driver.MediaDriver + +object AeronStreamMaxThroughputSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + val barrierTimeout = 5.minutes + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (20) + akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor = 1.0 + akka { + loglevel = ERROR + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor { + provider = "akka.remote.RemoteActorRefProvider" + serialize-creators = false + serialize-messages = false + } + remote.artery.enabled = off + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20511 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20512 + } + + final case class TestSettings( + testName: String, + totalMessages: Long, + payloadSize: Int) + + def iterate(start: Long, end: Long): Iterator[Long] = new AbstractIterator[Long] { + private[this] var first = true + private[this] var acc = start + def hasNext: Boolean = acc < end + def next(): Long = { + if (!hasNext) throw new NoSuchElementException("next on empty iterator") + if (first) first = false + else acc += 1 + + acc + } + } +} + +class AeronStreamMaxThroughputSpecMultiJvmNode1 extends AeronStreamMaxThroughputSpec +class AeronStreamMaxThroughputSpecMultiJvmNode2 extends AeronStreamMaxThroughputSpec + +abstract class AeronStreamMaxThroughputSpec + extends MultiNodeSpec(AeronStreamMaxThroughputSpec) + with STMultiNodeSpec with ImplicitSender { + + import AeronStreamMaxThroughputSpec._ + + val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor") + + val aeron = { + val ctx = new Aeron.Context + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + lazy implicit val mat = ActorMaterializer()(system) + import system.dispatcher + + def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong + + override def initialParticipants = roles.size + + def channel(roleName: RoleName) = { + val a = node(roleName).address + s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}" + } + + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + def reporter(name: String): RateReporter = { + val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + reporterExecutor.execute(r) + r + } + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + super.afterAll() + } + + def printTotal(testName: String, total: Long, startTime: Long, payloadSize: Long): Unit = { + val d = (System.nanoTime - startTime).nanos.toMillis + println(f"=== AeronStreamMaxThroughput $testName: " + + f"${1000.0 * total / d}%.03g msg/s, ${1000.0 * total * payloadSize / d}%.03g bytes/s, " + + s"payload size $payloadSize, " + + s"$d ms to deliver $total messages") + } + + val scenarios = List( + TestSettings( + testName = "size-100", + totalMessages = adjustedTotalMessages(1000000), + payloadSize = 100), + TestSettings( + testName = "size-1k", + totalMessages = adjustedTotalMessages(100000), + payloadSize = 1000), + TestSettings( + testName = "size-10k", + totalMessages = adjustedTotalMessages(10000), + payloadSize = 10000)) + + def test(testSettings: TestSettings): Unit = { + import testSettings._ + val receiverName = testName + "-rcv" + + runOn(second) { + val rep = reporter(testName) + var t0 = System.nanoTime() + var count = 0L + val done = TestLatch(1) + Source.fromGraph(new AeronSource(channel(second), aeron)) + .runForeach { bytes ⇒ + rep.onMessage(1, bytes.length) + count += 1 + if (count == 1) { + t0 = System.nanoTime() + } else if (count == totalMessages) { + printTotal(testName, totalMessages, t0, payloadSize) + done.countDown() + } + }.onFailure { + case e ⇒ + e.printStackTrace + } + + enterBarrier(receiverName + "-started") + Await.ready(done, barrierTimeout) + rep.halt() + enterBarrier(testName + "-done") + } + + runOn(first) { + enterBarrier(receiverName + "-started") + + val payload = ("0" * payloadSize).getBytes("utf-8") + val t0 = System.nanoTime() + Source.fromIterator(() ⇒ iterate(1, totalMessages)) + .map { n ⇒ payload } + .runWith(new AeronSink(channel(second), aeron)) + + enterBarrier(testName + "-done") + } + + enterBarrier("after-" + testName) + } + + "Max throughput of Aeron Streams" must { + + for (s ← scenarios) { + s"be great for ${s.testName}, payloadSize = ${s.payloadSize}" in test(s) + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala new file mode 100644 index 0000000000..3cc9fca890 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -0,0 +1,270 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.net.InetAddress +import java.util.concurrent.Executors +import scala.collection.AbstractIterator +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Source +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import io.aeron.Aeron +import io.aeron.driver.MediaDriver +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLongArray +import org.HdrHistogram.Histogram +import akka.stream.ThrottleMode +import java.io.StringWriter +import java.io.PrintStream +import java.io.OutputStreamWriter +import java.io.BufferedOutputStream +import java.io.ByteArrayOutputStream + +object LatencySpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + val barrierTimeout = 5.minutes + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3) + akka.test.LatencySpec.totalMessagesFactor = 1.0 + akka.test.LatencySpec.repeatCount = 1 + akka { + loglevel = ERROR + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor { + provider = "akka.remote.RemoteActorRefProvider" + serialize-creators = false + serialize-messages = false + } + remote.artery { + enabled = on + } + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20501 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20502 + } + + nodeConfig(first) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(first)} + """) + } + + nodeConfig(second) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(second)} + """) + } + + final case object Reset + + def echoProps(): Props = + Props(new Echo) + + class Echo extends Actor { + // FIXME to avoid using new RemoteActorRef each time + var cachedSender: ActorRef = null + + def receive = { + case Reset ⇒ + cachedSender = null + sender() ! Reset + case msg ⇒ + if (cachedSender == null) cachedSender = sender() + cachedSender ! msg + } + } + + def receiverProps(reporter: RateReporter, settings: TestSettings, totalMessages: Int, + sendTimes: AtomicLongArray, histogram: Histogram): Props = + Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram)) + + class Receiver(reporter: RateReporter, settings: TestSettings, totalMessages: Int, + sendTimes: AtomicLongArray, histogram: Histogram) extends Actor { + import settings._ + + var count = 0 + + def receive = { + case bytes: Array[Byte] ⇒ + if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message") + reporter.onMessage(1, payloadSize) + count += 1 + val d = System.nanoTime() - sendTimes.get(count - 1) + histogram.recordValue(d) + if (count == totalMessages) { + printTotal(testName, bytes.length, histogram) + context.stop(self) + } + } + + def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = { + import scala.collection.JavaConverters._ + val percentiles = histogram.percentiles(5) + def percentile(p: Double): Double = + percentiles.iterator().asScala.collectFirst { + case value if (p - 0.5) < value.getPercentileLevelIteratedTo && + value.getPercentileLevelIteratedTo < (p + 0.5) ⇒ value.getValueIteratedTo / 1000.0 + }.getOrElse(Double.NaN) + + println(s"=== Latency $testName: RTT " + + f"50%%ile: ${percentile(50.0)}%.0f µs, " + + f"90%%ile: ${percentile(90.0)}%.0f µs, " + + f"99%%ile: ${percentile(99.0)}%.0f µs, ") + println("Histogram of RTT latencies in microseconds.") + histogram.outputPercentileDistribution(System.out, 1000.0) + } + } + + final case class TestSettings( + testName: String, + messageRate: Int, // msg/s + payloadSize: Int, + repeat: Int) + +} + +class LatencySpecMultiJvmNode1 extends LatencySpec +class LatencySpecMultiJvmNode2 extends LatencySpec + +abstract class LatencySpec + extends MultiNodeSpec(LatencySpec) + with STMultiNodeSpec with ImplicitSender { + + import LatencySpec._ + + val totalMessagesFactor = system.settings.config.getDouble("akka.test.LatencySpec.totalMessagesFactor") + val repeatCount = system.settings.config.getInt("akka.test.LatencySpec.repeatCount") + + val aeron = { + val ctx = new Aeron.Context + val driver = MediaDriver.launchEmbedded() + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + lazy implicit val mat = ActorMaterializer()(system) + import system.dispatcher + + override def initialParticipants = roles.size + + def channel(roleName: RoleName) = { + val a = node(roleName).address + s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}" + } + + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + def reporter(name: String): RateReporter = { + val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + reporterExecutor.execute(r) + r + } + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + super.afterAll() + } + + def identifyEcho(name: String = "echo", r: RoleName = second): ActorRef = { + system.actorSelection(node(r) / "user" / name) ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + val scenarios = List( + TestSettings( + testName = "rate-100-size-100", + messageRate = 100, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-1000-size-100", + messageRate = 1000, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-10000-size-100", + messageRate = 10000, + payloadSize = 100, + repeat = repeatCount), + TestSettings( + testName = "rate-1000-size-1k", + messageRate = 1000, + payloadSize = 1000, + repeat = repeatCount)) + + def test(testSettings: TestSettings): Unit = { + import testSettings._ + + runOn(first) { + val payload = ("0" * payloadSize).getBytes("utf-8") + // by default run for 2 seconds, but can be adjusted with the totalMessagesFactor + val totalMessages = (2 * messageRate * totalMessagesFactor).toInt + val sendTimes = new AtomicLongArray(totalMessages) + val histogram = new Histogram(SECONDS.toNanos(10), 3) + val rep = reporter(testName) + + val echo = identifyEcho() + + for (n ← 1 to repeat) { + echo ! Reset + expectMsg(Reset) + histogram.reset() + val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram)) + + Source(1 to totalMessages) + .throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping) + .runForeach { n ⇒ + sendTimes.set(n - 1, System.nanoTime()) + echo.tell(payload, receiver) + } + + watch(receiver) + expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds) + } + + rep.halt() + } + + enterBarrier("after-" + testName) + } + + "Latency of Artery" must { + + "start echo" in { + runOn(second) { + // just echo back + system.actorOf(echoProps, "echo") + } + enterBarrier("echo-started") + } + + for (s ← scenarios) { + s"be low for ${s.testName}, at ${s.messageRate} msg/s, payloadSize = ${s.payloadSize}" in test(s) + } + + // TODO add more tests + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala new file mode 100644 index 0000000000..e56a6f1ec2 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -0,0 +1,277 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit.NANOSECONDS +import scala.concurrent.duration._ +import akka.actor._ +import akka.remote.RemoteActorRefProvider +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import java.net.InetAddress + +object MaxThroughputSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + val barrierTimeout = 5.minutes + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (20) + akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0 + akka { + loglevel = ERROR + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor { + provider = "akka.remote.RemoteActorRefProvider" + serialize-creators = false + serialize-messages = false + } + remote.artery { + enabled = on + } + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20501 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20502 + } + + nodeConfig(first) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(first)} + """) + } + + nodeConfig(second) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(second)} + """) + } + + case object Run + sealed trait Echo extends DeadLetterSuppression + final case object Start extends Echo + final case object End extends Echo + final case class EndResult(totalReceived: Long) + final case class FlowControl(burstStartTime: Long) extends Echo + + def receiverProps(reporter: RateReporter, payloadSize: Int): Props = + Props(new Receiver(reporter, payloadSize)) + + class Receiver(reporter: RateReporter, payloadSize: Int) extends Actor { + var c = 0L + + def receive = { + case Start ⇒ + c = 0 + sender() ! Start + case End ⇒ + sender() ! EndResult(c) + context.stop(self) + case m: Echo ⇒ + sender() ! m + case msg: Array[Byte] ⇒ + if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message") + reporter.onMessage(1, payloadSize) + c += 1 + } + } + + def senderProps(target: ActorRef, testSettings: TestSettings): Props = + Props(new Sender(target, testSettings)) + + class Sender(target: ActorRef, testSettings: TestSettings) extends Actor { + import testSettings._ + val payload = ("0" * testSettings.payloadSize).getBytes("utf-8") + var startTime = 0L + var remaining = totalMessages + var maxRoundTripMillis = 0L + + def receive = { + case Run ⇒ + // first some warmup + sendBatch() + // then Start, which will echo back here + target ! Start + + case Start ⇒ + println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " + + s"$burstSize and payload size $payloadSize") + startTime = System.nanoTime + remaining = totalMessages + // have a few batches in flight to make sure there are always messages to send + (1 to 3).foreach { _ ⇒ + val t0 = System.nanoTime() + sendBatch() + sendFlowControl(t0) + } + + case c @ FlowControl(t0) ⇒ + val now = System.nanoTime() + val duration = NANOSECONDS.toMillis(now - t0) + maxRoundTripMillis = math.max(maxRoundTripMillis, duration) + + sendBatch() + sendFlowControl(now) + + case EndResult(totalReceived) ⇒ + val took = NANOSECONDS.toMillis(System.nanoTime - startTime) + val throughtput = (totalReceived * 1000.0 / took).toInt + println( + s"=== MaxThroughput ${self.path.name}: " + + s"throughtput $throughtput msg/s, " + + s"dropped ${totalMessages - totalReceived}, " + + s"max round-trip $maxRoundTripMillis ms, " + + s"burst size $burstSize, " + + s"payload size $payloadSize, " + + s"$took ms to deliver $totalReceived messages") + context.stop(self) + } + + def sendBatch(): Unit = { + val batchSize = math.min(remaining, burstSize) + var i = 0 + while (i < batchSize) { + target ! payload + i += 1 + } + remaining -= batchSize + } + + def sendFlowControl(t0: Long): Unit = { + if (remaining <= 0) + target ! End + else + target ! FlowControl(t0) + } + } + + final case class TestSettings( + testName: String, + totalMessages: Long, + burstSize: Int, + payloadSize: Int, + senderReceiverPairs: Int) + +} + +class MaxThroughputSpecMultiJvmNode1 extends MaxThroughputSpec +class MaxThroughputSpecMultiJvmNode2 extends MaxThroughputSpec + +abstract class MaxThroughputSpec + extends MultiNodeSpec(MaxThroughputSpec) + with STMultiNodeSpec with ImplicitSender { + + import MaxThroughputSpec._ + + val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor") + + def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong + + override def initialParticipants = roles.size + + def remoteSettings = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].remoteSettings + + lazy val reporterExecutor = Executors.newFixedThreadPool(1) + def reporter(name: String): RateReporter = { + val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter { + override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { + println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format( + messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024))) + } + }) + reporterExecutor.execute(r) + r + } + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + super.afterAll() + } + + def identifyReceiver(name: String, r: RoleName = second): ActorRef = { + system.actorSelection(node(r) / "user" / name) ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + val scenarios = List( + TestSettings( + testName = "1-to-1", + totalMessages = adjustedTotalMessages(20000), + burstSize = 1000, + payloadSize = 100, + senderReceiverPairs = 1), + TestSettings( + testName = "1-to-1-size-1k", + totalMessages = adjustedTotalMessages(20000), + burstSize = 1000, + payloadSize = 1000, + senderReceiverPairs = 1), + TestSettings( + testName = "1-to-1-size-10k", + totalMessages = adjustedTotalMessages(10000), + burstSize = 1000, + payloadSize = 10000, + senderReceiverPairs = 1), + TestSettings( + testName = "5-to-5", + totalMessages = adjustedTotalMessages(20000), + burstSize = 1000, + payloadSize = 100, + senderReceiverPairs = 5)) + + def test(testSettings: TestSettings): Unit = { + import testSettings._ + val receiverName = testName + "-rcv" + + runOn(second) { + val rep = reporter(testName) + for (n ← 1 to senderReceiverPairs) { + val receiver = system.actorOf(receiverProps(rep, payloadSize), receiverName + n) + } + enterBarrier(receiverName + "-started") + enterBarrier(testName + "-done") + rep.halt() + } + + runOn(first) { + enterBarrier(receiverName + "-started") + val senders = for (n ← 1 to senderReceiverPairs) yield { + val receiver = identifyReceiver(receiverName + n) + val snd = system.actorOf(senderProps(receiver, testSettings), testName + "-snd" + n) + val p = TestProbe() + p.watch(snd) + snd ! Run + (snd, p) + } + senders.foreach { + case (snd, p) ⇒ + val t = if (snd == senders.head._1) barrierTimeout else 10.seconds + p.expectTerminated(snd, t) + } + enterBarrier(testName + "-done") + } + + enterBarrier("after-" + testName) + } + + "Max throughput of Artery" must { + + for (s ← scenarios) { + s"be great for ${s.testName}, burstSize = ${s.burstSize}, payloadSize = ${s.payloadSize}" in test(s) + } + + // TODO add more tests, such as 5-to-5 sender receiver pairs + + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index e2aa5d8538..94e3de17fe 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -75,7 +75,13 @@ akka { artery { enabled = off port = 20200 - hostname = localhost + + # The hostname or ip clients should connect to. + # InetAddress.getLocalHost.getHostAddress is used if empty or + # "" is specified. + # InetAddress.getLocalHost.getHostName is used if + # "" is specified. + hostname = "" } ### General settings diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 97806381ff..6493489ba1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -15,6 +15,7 @@ import akka.event.Logging import akka.event.Logging.LogLevel import akka.ConfigurationException import java.util.Locale +import java.net.InetAddress final class RemoteSettings(val config: Config) { import config._ @@ -22,7 +23,11 @@ final class RemoteSettings(val config: Config) { val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled") val ArteryPort: Int = getInt("akka.remote.artery.port") - val ArteryHostname: String = getString("akka.remote.artery.hostname") + val ArteryHostname: String = getString("akka.remote.artery.hostname") match { + case "" | "" ⇒ InetAddress.getLocalHost.getHostAddress + case "" ⇒ InetAddress.getLocalHost.getHostName + case other ⇒ other + } val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index 06296c6e78..d4261ea99d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -73,8 +73,6 @@ class AeronSink(channel: String, aeron: Aeron) extends GraphStage[SinkShape[Aero // retrying/polling to a separate thread that performs the polling for // all sources/sinks and notifies back when there is some news. backoffCount -= 1 - if (backoffCount <= 10) - println(s"# snd backoff $backoffCount") // FIXME if (backoffCount <= 10) scheduleOnce(Backoff, 50.millis) else diff --git a/akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java b/akka-remote/src/test/java/akka/remote/artery/RateReporter.java similarity index 99% rename from akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java rename to akka-remote/src/test/java/akka/remote/artery/RateReporter.java index e042e58bab..0e455fc0ab 100644 --- a/akka-bench-jmh/src/main/java/akka/aeron/RateReporter.java +++ b/akka-remote/src/test/java/akka/remote/artery/RateReporter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package akka.aeron; +package akka.remote.artery; import java.util.concurrent.locks.LockSupport; diff --git a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala similarity index 75% rename from akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala rename to akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala index 9857db6e69..ce25f329a0 100644 --- a/akka-bench-jmh/src/main/scala/akka/aeron/AeronStreams.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala @@ -1,4 +1,4 @@ -package akka.aeron +package akka.remote.artery import scala.concurrent.duration._ import akka.actor.ActorSystem @@ -17,10 +17,13 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.atomic.AtomicLongArray import akka.stream.ThrottleMode -import akka.remote.artery.AeronSink -import akka.remote.artery.AeronSource +import org.agrona.ErrorHandler +import io.aeron.AvailableImageHandler +import io.aeron.UnavailableImageHandler +import io.aeron.Image +import io.aeron.AvailableImageHandler -object AeronStreams { +object AeronStreamsApp { val channel1 = "aeron:udp?endpoint=localhost:40123" val channel2 = "aeron:udp?endpoint=localhost:40124" @@ -32,7 +35,27 @@ object AeronStreams { lazy val aeron = { val ctx = new Aeron.Context - val driver = MediaDriver.launchEmbedded() + ctx.errorHandler(new ErrorHandler { + override def onError(cause: Throwable) { + println(s"# Aeron onError " + cause) // FIXME + } + }) + ctx.availableImageHandler(new AvailableImageHandler { + override def onAvailableImage(img: Image): Unit = { + println(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}") + } + }) + ctx.unavailableImageHandler(new UnavailableImageHandler { + override def onUnavailableImage(img: Image): Unit = { + println(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}") + } + }) + + val driverContext = new MediaDriver.Context + driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.driverTimeoutMs(SECONDS.toNanos(10)) + val driver = MediaDriver.launchEmbedded(driverContext) ctx.aeronDirectoryName(driver.aeronDirectoryName) Aeron.connect(ctx) } @@ -111,12 +134,12 @@ object AeronStreams { var t0 = System.nanoTime() var count = 0L var payloadSize = 0L - Source.fromGraph(new AeronSource(channel1, () => aeron)) - .map { bytes => + Source.fromGraph(new AeronSource(channel1, aeron)) + .map { bytes ⇒ r.onMessage(1, bytes.length) bytes } - .runForeach { bytes => + .runForeach { bytes ⇒ count += 1 if (count == 1) { t0 = System.nanoTime() @@ -126,7 +149,7 @@ object AeronStreams { printTotal(throughputN, "receive", t0, payloadSize) } }.onFailure { - case e => + case e ⇒ e.printStackTrace exit(-1) } @@ -138,30 +161,30 @@ object AeronStreams { val r = reporter val t0 = System.nanoTime() Source(1 to throughputN) - .map { n => + .map { n ⇒ if (n == throughputN) { exit(0) printTotal(throughputN, "send", t0, payload.length) } n } - .map { _ => + .map { _ ⇒ r.onMessage(1, payload.length) payload } - .runWith(new AeronSink(channel1, () => aeron)) + .runWith(new AeronSink(channel1, aeron)) } def runEchoReceiver(): Unit = { // just echo back on channel2 reporterExecutor.execute(reporter) val r = reporter - Source.fromGraph(new AeronSource(channel1, () => aeron)) - .map { bytes => + Source.fromGraph(new AeronSource(channel1, aeron)) + .map { bytes ⇒ r.onMessage(1, bytes.length) bytes } - .runWith(new AeronSink(channel2, () => aeron)) + .runWith(new AeronSink(channel2, aeron)) } def runEchoSender(): Unit = { @@ -173,12 +196,12 @@ object AeronStreams { var repeat = 3 val count = new AtomicInteger var t0 = System.nanoTime() - Source.fromGraph(new AeronSource(channel2, () => aeron)) - .map { bytes => + Source.fromGraph(new AeronSource(channel2, aeron)) + .map { bytes ⇒ r.onMessage(1, bytes.length) bytes } - .runForeach { bytes => + .runForeach { bytes ⇒ val c = count.incrementAndGet() val d = System.nanoTime() - sendTimes.get(c - 1) if (c % (latencyN / 10) == 0) @@ -189,7 +212,7 @@ object AeronStreams { barrier.await() // this is always the last party } }.onFailure { - case e => + case e ⇒ e.printStackTrace exit(-1) } @@ -202,13 +225,13 @@ object AeronStreams { Source(1 to latencyN) .throttle(latencyRate, 1.second, latencyRate / 10, ThrottleMode.Shaping) - .map { n => + .map { n ⇒ if (n % (latencyN / 10) == 0) println(s"# send offset $n") // FIXME sendTimes.set(n - 1, System.nanoTime()) payload } - .runWith(new AeronSink(channel1, () => aeron)) + .runWith(new AeronSink(channel1, aeron)) barrier.await() } @@ -218,12 +241,12 @@ object AeronStreams { def runDebugReceiver(): Unit = { import system.dispatcher - Source.fromGraph(new AeronSource(channel1, () => aeron)) - .map(bytes => new String(bytes, "utf-8")) - .runForeach { s => + Source.fromGraph(new AeronSource(channel1, aeron)) + .map(bytes ⇒ new String(bytes, "utf-8")) + .runForeach { s ⇒ println(s) }.onFailure { - case e => + case e ⇒ e.printStackTrace exit(-1) } @@ -233,12 +256,12 @@ object AeronStreams { val fill = "0000" Source(1 to 1000) .throttle(1, 1.second, 1, ThrottleMode.Shaping) - .map { n => + .map { n ⇒ val s = (fill + n.toString).takeRight(4) println(s) s.getBytes("utf-8") } - .runWith(new AeronSink(channel1, () => aeron)) + .runWith(new AeronSink(channel1, aeron)) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 34013720c6..b6c320206c 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -1,5 +1,6 @@ package akka.remote.artery +import scala.concurrent.duration._ import akka.actor.{ Actor, ActorIdentity, ActorSystem, Deploy, ExtendedActorSystem, Identify, Props, RootActorPath } import akka.testkit.{ AkkaSpec, ImplicitSender } import com.typesafe.config.ConfigFactory @@ -12,6 +13,7 @@ object RemoteSendConsistencySpec { akka { actor.provider = "akka.remote.RemoteActorRefProvider" remote.artery.enabled = on + remote.artery.hostname = localhost } """ @@ -63,8 +65,8 @@ class RemoteSendConsistencySpec extends AkkaSpec(commonConfig) with ImplicitSend } val senderProps = Props(new Actor { - var counter = 1000 - remoteRef ! 1000 + var counter = 100 // FIXME try this test with 1000, why does it take so long? + remoteRef ! counter override def receive: Receive = { case i: Int ⇒ @@ -84,10 +86,12 @@ class RemoteSendConsistencySpec extends AkkaSpec(commonConfig) with ImplicitSend system.actorOf(senderProps) system.actorOf(senderProps) - expectMsg("success") - expectMsg("success") - expectMsg("success") - expectMsg("success") + within(10.seconds) { + expectMsg("success") + expectMsg("success") + expectMsg("success") + expectMsg("success") + } } } diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala index 8c7e983d97..943f020050 100644 --- a/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala @@ -41,8 +41,8 @@ private[akka] class HdrHistogram( private def wrapHistogramOutOfBoundsException(value: Long, ex: ArrayIndexOutOfBoundsException): IllegalArgumentException = new IllegalArgumentException(s"Given value $value can not be stored in this histogram " + - s"(min: ${hist.getLowestTrackableValue}, max: ${hist.getHighestTrackableValue}})", ex) + s"(min: ${hist.getLowestDiscernibleValue}, max: ${hist.getHighestTrackableValue}})", ex) - def getData = hist.copy().getHistogramData + def getData = hist.copy() } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 1530ee7d50..2973c04915 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -132,7 +132,7 @@ object AkkaBuild extends Build { lazy val remoteTests = Project( id = "akka-remote-tests", base = file("akka-remote-tests"), - dependencies = Seq(actorTests % "test->test", multiNodeTestkit) + dependencies = Seq(actorTests % "test->test", remote % "test->test", multiNodeTestkit) ) configs (MultiJvm) lazy val cluster = Project( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b55dc7d954..7e09655211 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -69,7 +69,6 @@ object Dependencies { val aeronDriver = "io.aeron" % "aeron-driver" % "0.9.5" // ApacheV2 val aeronClient = "io.aeron" % "aeron-client" % "0.9.5" // ApacheV2 - val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" // CC0 object Docs { val sprayJson = "io.spray" %% "spray-json" % "1.3.2" % "test" @@ -96,7 +95,7 @@ object Dependencies { val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2 val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.2" % "test" // ApacheV2 val latencyUtils = "org.latencyutils" % "LatencyUtils" % "1.0.3" % "test" // Free BSD - val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.1.4" % "test" // CC0 + val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" % "test" // CC0 val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram) // sigar logging @@ -165,7 +164,7 @@ object Dependencies { val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo) - val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, hdrHistogram) + val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative) // akka stream & http