Merge pull request #20328 from akka/wip-artery-benchmarks-patriknw

add Artery throughput and latency tests
This commit is contained in:
Patrik Nordwall 2016-04-22 17:20:25 +02:00
commit eb8f54c3c3
18 changed files with 1198 additions and 79 deletions

View file

@ -196,6 +196,7 @@ object MultiNodeSpec {
private[testkit] val nodeConfig = mapToConfig(Map(
"akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
"akka.remote.artery.hostname" -> selfName,
"akka.remote.netty.tcp.hostname" -> selfName,
"akka.remote.netty.tcp.port" -> selfPort))

View file

@ -0,0 +1,228 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLongArray
import scala.concurrent.duration._
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Source
import akka.testkit._
import com.typesafe.config.ConfigFactory
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import org.HdrHistogram.Histogram
import java.util.concurrent.atomic.AtomicBoolean
object AeronStreamLatencySpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val barrierTimeout = 5.minutes
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
# for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3)
akka.test.AeronStreamLatencySpec.totalMessagesFactor = 1.0
akka.test.AeronStreamLatencySpec.repeatCount = 1
akka {
loglevel = ERROR
testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s
actor {
provider = "akka.remote.RemoteActorRefProvider"
serialize-creators = false
serialize-messages = false
}
remote.artery.enabled = off
}
""")))
def aeronPort(roleName: RoleName): Int =
roleName match {
case `first` 20521 // TODO yeah, we should have support for dynamic port assignment
case `second` 20522
}
final case class TestSettings(
testName: String,
messageRate: Int, // msg/s
payloadSize: Int,
repeat: Int)
}
class AeronStreamLatencySpecMultiJvmNode1 extends AeronStreamLatencySpec
class AeronStreamLatencySpecMultiJvmNode2 extends AeronStreamLatencySpec
abstract class AeronStreamLatencySpec
extends MultiNodeSpec(AeronStreamLatencySpec)
with STMultiNodeSpec with ImplicitSender {
import AeronStreamLatencySpec._
val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamLatencySpec.totalMessagesFactor")
val repeatCount = system.settings.config.getInt("akka.test.AeronStreamLatencySpec.repeatCount")
var plots = LatencyPlots()
val aeron = {
val ctx = new Aeron.Context
val driver = MediaDriver.launchEmbedded()
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
lazy implicit val mat = ActorMaterializer()(system)
import system.dispatcher
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val a = node(roleName).address
s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}"
}
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): RateReporter = {
val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter {
override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = {
println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format(
messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024)))
}
})
reporterExecutor.execute(r)
r
}
override def afterAll(): Unit = {
reporterExecutor.shutdown()
runOn(first) {
println(plots.plot50.csv(system.name + "50"))
println(plots.plot90.csv(system.name + "90"))
println(plots.plot99.csv(system.name + "99"))
}
super.afterAll()
}
def printTotal(testName: String, payloadSize: Long, histogram: Histogram, lastRepeat: Boolean): Unit = {
import scala.collection.JavaConverters._
val percentiles = histogram.percentiles(5)
def percentile(p: Double): Double =
percentiles.iterator().asScala.collectFirst {
case value if (p - 0.5) < value.getPercentileLevelIteratedTo &&
value.getPercentileLevelIteratedTo < (p + 0.5) value.getValueIteratedTo / 1000.0
}.getOrElse(Double.NaN)
println(s"=== AeronStreamLatency $testName: RTT " +
f"50%%ile: ${percentile(50.0)}%.0f µs, " +
f"90%%ile: ${percentile(90.0)}%.0f µs, " +
f"99%%ile: ${percentile(99.0)}%.0f µs, ")
println("Histogram of RTT latencies in microseconds.")
histogram.outputPercentileDistribution(System.out, 1000.0)
// only use the last repeat for the plots
if (lastRepeat) {
plots = plots.copy(
plot50 = plots.plot50.add(testName, percentile(50.0)),
plot90 = plots.plot90.add(testName, percentile(90.0)),
plot99 = plots.plot99.add(testName, percentile(99.0)))
}
}
val scenarios = List(
TestSettings(
testName = "rate-100-size-100",
messageRate = 100,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-1000-size-100",
messageRate = 1000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-10000-size-100",
messageRate = 10000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-1000-size-1k",
messageRate = 1000,
payloadSize = 1000,
repeat = repeatCount))
def test(testSettings: TestSettings): Unit = {
import testSettings._
runOn(first) {
val payload = ("0" * payloadSize).getBytes("utf-8")
// by default run for 2 seconds, but can be adjusted with the totalMessagesFactor
val totalMessages = (2 * messageRate * totalMessagesFactor).toInt
val sendTimes = new AtomicLongArray(totalMessages)
val histogram = new Histogram(SECONDS.toNanos(10), 3)
val rep = reporter(testName)
val barrier = new CyclicBarrier(2)
val count = new AtomicInteger
val lastRepeat = new AtomicBoolean(false)
Source.fromGraph(new AeronSource(channel(first), aeron))
.runForeach { bytes
if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message")
rep.onMessage(1, payloadSize)
val c = count.incrementAndGet()
val d = System.nanoTime() - sendTimes.get(c - 1)
histogram.recordValue(d)
if (c == totalMessages) {
printTotal(testName, bytes.length, histogram, lastRepeat.get)
barrier.await() // this is always the last party
}
}
for (n 1 to repeat) {
histogram.reset()
count.set(0)
lastRepeat.set(n == repeat)
Source(1 to totalMessages)
.throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping)
.map { n
sendTimes.set(n - 1, System.nanoTime())
payload
}
.runWith(new AeronSink(channel(second), aeron))
barrier.await((totalMessages / messageRate) + 10, SECONDS)
}
rep.halt()
}
enterBarrier("after-" + testName)
}
"Latency of Aeron Streams" must {
"start echo" in {
runOn(second) {
// just echo back
Source.fromGraph(new AeronSource(channel(second), aeron))
.runWith(new AeronSink(channel(first), aeron))
}
enterBarrier("echo-started")
}
for (s scenarios) {
s"be low for ${s.testName}, at ${s.messageRate} msg/s, payloadSize = ${s.payloadSize}" in test(s)
}
}
}

View file

@ -0,0 +1,202 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.net.InetAddress
import java.util.concurrent.Executors
import scala.collection.AbstractIterator
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.testkit._
import com.typesafe.config.ConfigFactory
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
object AeronStreamMaxThroughputSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val barrierTimeout = 5.minutes
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
# for serious measurements you should increase the totalMessagesFactor (20)
akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor = 1.0
akka {
loglevel = ERROR
testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s
actor {
provider = "akka.remote.RemoteActorRefProvider"
serialize-creators = false
serialize-messages = false
}
remote.artery.enabled = off
}
""")))
def aeronPort(roleName: RoleName): Int =
roleName match {
case `first` 20511 // TODO yeah, we should have support for dynamic port assignment
case `second` 20512
}
final case class TestSettings(
testName: String,
totalMessages: Long,
payloadSize: Int)
def iterate(start: Long, end: Long): Iterator[Long] = new AbstractIterator[Long] {
private[this] var first = true
private[this] var acc = start
def hasNext: Boolean = acc < end
def next(): Long = {
if (!hasNext) throw new NoSuchElementException("next on empty iterator")
if (first) first = false
else acc += 1
acc
}
}
}
class AeronStreamMaxThroughputSpecMultiJvmNode1 extends AeronStreamMaxThroughputSpec
class AeronStreamMaxThroughputSpecMultiJvmNode2 extends AeronStreamMaxThroughputSpec
abstract class AeronStreamMaxThroughputSpec
extends MultiNodeSpec(AeronStreamMaxThroughputSpec)
with STMultiNodeSpec with ImplicitSender {
import AeronStreamMaxThroughputSpec._
val totalMessagesFactor = system.settings.config.getDouble("akka.test.AeronStreamMaxThroughputSpec.totalMessagesFactor")
var plot = PlotResult()
val aeron = {
val ctx = new Aeron.Context
val driver = MediaDriver.launchEmbedded()
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
lazy implicit val mat = ActorMaterializer()(system)
import system.dispatcher
def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val a = node(roleName).address
s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}"
}
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): RateReporter = {
val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter {
override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = {
println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format(
messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024)))
}
})
reporterExecutor.execute(r)
r
}
override def afterAll(): Unit = {
reporterExecutor.shutdown()
runOn(second) {
println(plot.csv(system.name))
}
super.afterAll()
}
def printTotal(testName: String, total: Long, startTime: Long, payloadSize: Long): Unit = {
val d = (System.nanoTime - startTime).nanos.toMillis
val throughput = 1000.0 * total / d
println(f"=== AeronStreamMaxThroughput $testName: " +
f"${throughput}%.03g msg/s, ${throughput * payloadSize}%.03g bytes/s, " +
s"payload size $payloadSize, " +
s"$d ms to deliver $total messages")
plot = plot.add(testName, throughput * payloadSize / 1024 / 1024)
}
val scenarios = List(
TestSettings(
testName = "size-100",
totalMessages = adjustedTotalMessages(1000000),
payloadSize = 100),
TestSettings(
testName = "size-1k",
totalMessages = adjustedTotalMessages(100000),
payloadSize = 1000),
TestSettings(
testName = "size-10k",
totalMessages = adjustedTotalMessages(10000),
payloadSize = 10000))
def test(testSettings: TestSettings): Unit = {
import testSettings._
val receiverName = testName + "-rcv"
runOn(second) {
val rep = reporter(testName)
var t0 = System.nanoTime()
var count = 0L
val done = TestLatch(1)
Source.fromGraph(new AeronSource(channel(second), aeron))
.runForeach { bytes
rep.onMessage(1, bytes.length)
count += 1
if (count == 1) {
t0 = System.nanoTime()
} else if (count == totalMessages) {
printTotal(testName, totalMessages, t0, payloadSize)
done.countDown()
}
}.onFailure {
case e
e.printStackTrace
}
enterBarrier(receiverName + "-started")
Await.ready(done, barrierTimeout)
rep.halt()
enterBarrier(testName + "-done")
}
runOn(first) {
enterBarrier(receiverName + "-started")
val payload = ("0" * payloadSize).getBytes("utf-8")
val t0 = System.nanoTime()
Source.fromIterator(() iterate(1, totalMessages))
.map { n payload }
.runWith(new AeronSink(channel(second), aeron))
enterBarrier(testName + "-done")
}
enterBarrier("after-" + testName)
}
"Max throughput of Aeron Streams" must {
for (s scenarios) {
s"be great for ${s.testName}, payloadSize = ${s.payloadSize}" in test(s)
}
}
}

View file

@ -0,0 +1,292 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.net.InetAddress
import java.util.concurrent.Executors
import scala.collection.AbstractIterator
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.testkit._
import com.typesafe.config.ConfigFactory
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLongArray
import org.HdrHistogram.Histogram
import akka.stream.ThrottleMode
import java.io.StringWriter
import java.io.PrintStream
import java.io.OutputStreamWriter
import java.io.BufferedOutputStream
import java.io.ByteArrayOutputStream
object LatencySpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val barrierTimeout = 5.minutes
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
# for serious measurements you should increase the totalMessagesFactor (10) and repeatCount (3)
akka.test.LatencySpec.totalMessagesFactor = 1.0
akka.test.LatencySpec.repeatCount = 1
akka {
loglevel = ERROR
testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s
actor {
provider = "akka.remote.RemoteActorRefProvider"
serialize-creators = false
serialize-messages = false
}
remote.artery {
enabled = on
}
}
""")))
def aeronPort(roleName: RoleName): Int =
roleName match {
case `first` 20501 // TODO yeah, we should have support for dynamic port assignment
case `second` 20502
}
nodeConfig(first) {
ConfigFactory.parseString(s"""
akka.remote.artery.port = ${aeronPort(first)}
""")
}
nodeConfig(second) {
ConfigFactory.parseString(s"""
akka.remote.artery.port = ${aeronPort(second)}
""")
}
final case object Reset
def echoProps(): Props =
Props(new Echo)
class Echo extends Actor {
// FIXME to avoid using new RemoteActorRef each time
var cachedSender: ActorRef = null
def receive = {
case Reset
cachedSender = null
sender() ! Reset
case msg
if (cachedSender == null) cachedSender = sender()
cachedSender ! msg
}
}
def receiverProps(reporter: RateReporter, settings: TestSettings, totalMessages: Int,
sendTimes: AtomicLongArray, histogram: Histogram, plotsRef: ActorRef): Props =
Props(new Receiver(reporter, settings, totalMessages, sendTimes, histogram, plotsRef))
class Receiver(reporter: RateReporter, settings: TestSettings, totalMessages: Int,
sendTimes: AtomicLongArray, histogram: Histogram, plotsRef: ActorRef) extends Actor {
import settings._
var count = 0
def receive = {
case bytes: Array[Byte]
if (bytes.length != payloadSize) throw new IllegalArgumentException("Invalid message")
reporter.onMessage(1, payloadSize)
count += 1
val d = System.nanoTime() - sendTimes.get(count - 1)
histogram.recordValue(d)
if (count == totalMessages) {
printTotal(testName, bytes.length, histogram)
context.stop(self)
}
}
def printTotal(testName: String, payloadSize: Long, histogram: Histogram): Unit = {
import scala.collection.JavaConverters._
val percentiles = histogram.percentiles(5)
def percentile(p: Double): Double =
percentiles.iterator().asScala.collectFirst {
case value if (p - 0.5) < value.getPercentileLevelIteratedTo &&
value.getPercentileLevelIteratedTo < (p + 0.5) value.getValueIteratedTo / 1000.0
}.getOrElse(Double.NaN)
println(s"=== Latency $testName: RTT " +
f"50%%ile: ${percentile(50.0)}%.0f µs, " +
f"90%%ile: ${percentile(90.0)}%.0f µs, " +
f"99%%ile: ${percentile(99.0)}%.0f µs, ")
println("Histogram of RTT latencies in microseconds.")
histogram.outputPercentileDistribution(System.out, 1000.0)
val plots = LatencyPlots(
PlotResult().add(testName, percentile(50.0)),
PlotResult().add(testName, percentile(90.0)),
PlotResult().add(testName, percentile(99.0)))
plotsRef ! plots
}
}
final case class TestSettings(
testName: String,
messageRate: Int, // msg/s
payloadSize: Int,
repeat: Int)
}
class LatencySpecMultiJvmNode1 extends LatencySpec
class LatencySpecMultiJvmNode2 extends LatencySpec
abstract class LatencySpec
extends MultiNodeSpec(LatencySpec)
with STMultiNodeSpec with ImplicitSender {
import LatencySpec._
val totalMessagesFactor = system.settings.config.getDouble("akka.test.LatencySpec.totalMessagesFactor")
val repeatCount = system.settings.config.getInt("akka.test.LatencySpec.repeatCount")
var plots = LatencyPlots()
val aeron = {
val ctx = new Aeron.Context
val driver = MediaDriver.launchEmbedded()
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
lazy implicit val mat = ActorMaterializer()(system)
import system.dispatcher
override def initialParticipants = roles.size
def channel(roleName: RoleName) = {
val a = node(roleName).address
s"aeron:udp?endpoint=${a.host.get}:${aeronPort(roleName)}"
}
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): RateReporter = {
val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter {
override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = {
println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format(
messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024)))
}
})
reporterExecutor.execute(r)
r
}
override def afterAll(): Unit = {
reporterExecutor.shutdown()
runOn(first) {
println(plots.plot50.csv(system.name + "50"))
println(plots.plot90.csv(system.name + "90"))
println(plots.plot99.csv(system.name + "99"))
}
super.afterAll()
}
def identifyEcho(name: String = "echo", r: RoleName = second): ActorRef = {
system.actorSelection(node(r) / "user" / name) ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
val scenarios = List(
TestSettings(
testName = "rate-100-size-100",
messageRate = 100,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-1000-size-100",
messageRate = 1000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-10000-size-100",
messageRate = 10000,
payloadSize = 100,
repeat = repeatCount),
TestSettings(
testName = "rate-1000-size-1k",
messageRate = 1000,
payloadSize = 1000,
repeat = repeatCount))
def test(testSettings: TestSettings): Unit = {
import testSettings._
runOn(first) {
val payload = ("0" * payloadSize).getBytes("utf-8")
// by default run for 2 seconds, but can be adjusted with the totalMessagesFactor
val totalMessages = (2 * messageRate * totalMessagesFactor).toInt
val sendTimes = new AtomicLongArray(totalMessages)
val histogram = new Histogram(SECONDS.toNanos(10), 3)
val rep = reporter(testName)
val echo = identifyEcho()
val plotProbe = TestProbe()
for (n 1 to repeat) {
echo ! Reset
expectMsg(Reset)
histogram.reset()
val receiver = system.actorOf(receiverProps(rep, testSettings, totalMessages, sendTimes, histogram, plotProbe.ref))
Source(1 to totalMessages)
.throttle(messageRate, 1.second, math.max(messageRate / 10, 1), ThrottleMode.Shaping)
.runForeach { n
sendTimes.set(n - 1, System.nanoTime())
echo.tell(payload, receiver)
}
watch(receiver)
expectTerminated(receiver, ((totalMessages / messageRate) + 10).seconds)
val p = plotProbe.expectMsgType[LatencyPlots]
// only use the last repeat for the plots
if (n == repeat) {
plots = plots.copy(
plot50 = plots.plot50.addAll(p.plot50),
plot90 = plots.plot90.addAll(p.plot90),
plot99 = plots.plot99.addAll(p.plot99))
}
}
rep.halt()
}
enterBarrier("after-" + testName)
}
"Latency of Artery" must {
"start echo" in {
runOn(second) {
// just echo back
system.actorOf(echoProps, "echo")
}
enterBarrier("echo-started")
}
for (s scenarios) {
s"be low for ${s.testName}, at ${s.messageRate} msg/s, payloadSize = ${s.payloadSize}" in test(s)
}
// TODO add more tests
}
}

View file

@ -0,0 +1,291 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._
import akka.actor._
import akka.remote.RemoteActorRefProvider
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import java.net.InetAddress
object MaxThroughputSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val barrierTimeout = 5.minutes
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
# for serious measurements you should increase the totalMessagesFactor (20)
akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0
akka {
loglevel = ERROR
testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s
actor {
provider = "akka.remote.RemoteActorRefProvider"
serialize-creators = false
serialize-messages = false
}
remote.artery {
enabled = on
}
}
""")))
def aeronPort(roleName: RoleName): Int =
roleName match {
case `first` 20501 // TODO yeah, we should have support for dynamic port assignment
case `second` 20502
}
nodeConfig(first) {
ConfigFactory.parseString(s"""
akka.remote.artery.port = ${aeronPort(first)}
""")
}
nodeConfig(second) {
ConfigFactory.parseString(s"""
akka.remote.artery.port = ${aeronPort(second)}
""")
}
case object Run
sealed trait Echo extends DeadLetterSuppression
final case object Start extends Echo
final case object End extends Echo
final case class EndResult(totalReceived: Long)
final case class FlowControl(burstStartTime: Long) extends Echo
def receiverProps(reporter: RateReporter, payloadSize: Int): Props =
Props(new Receiver(reporter, payloadSize))
class Receiver(reporter: RateReporter, payloadSize: Int) extends Actor {
var c = 0L
def receive = {
case Start
c = 0
sender() ! Start
case End
sender() ! EndResult(c)
context.stop(self)
case m: Echo
sender() ! m
case msg: Array[Byte]
if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message")
reporter.onMessage(1, payloadSize)
c += 1
}
}
def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef): Props =
Props(new Sender(target, testSettings, plotRef))
class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef) extends Actor {
import testSettings._
val payload = ("0" * testSettings.payloadSize).getBytes("utf-8")
var startTime = 0L
var remaining = totalMessages
var maxRoundTripMillis = 0L
def receive = {
case Run
// first some warmup
sendBatch()
// then Start, which will echo back here
target ! Start
case Start
println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " +
s"$burstSize and payload size $payloadSize")
startTime = System.nanoTime
remaining = totalMessages
// have a few batches in flight to make sure there are always messages to send
(1 to 3).foreach { _
val t0 = System.nanoTime()
sendBatch()
sendFlowControl(t0)
}
case c @ FlowControl(t0)
val now = System.nanoTime()
val duration = NANOSECONDS.toMillis(now - t0)
maxRoundTripMillis = math.max(maxRoundTripMillis, duration)
sendBatch()
sendFlowControl(now)
case EndResult(totalReceived)
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
val throughput = (totalReceived * 1000.0 / took)
println(
s"=== MaxThroughput ${self.path.name}: " +
f"throughput ${throughput}%.03g msg/s, " +
f"${throughput * payloadSize}%.03g bytes/s, " +
s"dropped ${totalMessages - totalReceived}, " +
s"max round-trip $maxRoundTripMillis ms, " +
s"burst size $burstSize, " +
s"payload size $payloadSize, " +
s"$took ms to deliver $totalReceived messages")
plotRef ! PlotResult().add(testName, throughput * payloadSize / 1024 / 1024)
context.stop(self)
}
def sendBatch(): Unit = {
val batchSize = math.min(remaining, burstSize)
var i = 0
while (i < batchSize) {
target ! payload
i += 1
}
remaining -= batchSize
}
def sendFlowControl(t0: Long): Unit = {
if (remaining <= 0)
target ! End
else
target ! FlowControl(t0)
}
}
final case class TestSettings(
testName: String,
totalMessages: Long,
burstSize: Int,
payloadSize: Int,
senderReceiverPairs: Int)
}
class MaxThroughputSpecMultiJvmNode1 extends MaxThroughputSpec
class MaxThroughputSpecMultiJvmNode2 extends MaxThroughputSpec
abstract class MaxThroughputSpec
extends MultiNodeSpec(MaxThroughputSpec)
with STMultiNodeSpec with ImplicitSender {
import MaxThroughputSpec._
val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor")
var plot = PlotResult()
def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong
override def initialParticipants = roles.size
def remoteSettings = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].remoteSettings
lazy val reporterExecutor = Executors.newFixedThreadPool(1)
def reporter(name: String): RateReporter = {
val r = new RateReporter(SECONDS.toNanos(1), new RateReporter.Reporter {
override def onReport(messagesPerSec: Double, bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = {
println(name + ": %.03g msgs/sec, %.03g bytes/sec, totals %d messages %d MB".format(
messagesPerSec, bytesPerSec, totalMessages, totalBytes / (1024 * 1024)))
}
})
reporterExecutor.execute(r)
r
}
override def afterAll(): Unit = {
reporterExecutor.shutdown()
runOn(first) {
println(plot.csv(system.name))
}
super.afterAll()
}
def identifyReceiver(name: String, r: RoleName = second): ActorRef = {
system.actorSelection(node(r) / "user" / name) ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
val scenarios = List(
TestSettings(
testName = "1-to-1",
totalMessages = adjustedTotalMessages(20000),
burstSize = 1000,
payloadSize = 100,
senderReceiverPairs = 1),
TestSettings(
testName = "1-to-1-size-1k",
totalMessages = adjustedTotalMessages(20000),
burstSize = 1000,
payloadSize = 1000,
senderReceiverPairs = 1),
TestSettings(
testName = "1-to-1-size-10k",
totalMessages = adjustedTotalMessages(10000),
burstSize = 1000,
payloadSize = 10000,
senderReceiverPairs = 1),
TestSettings(
testName = "5-to-5",
totalMessages = adjustedTotalMessages(20000),
burstSize = 1000,
payloadSize = 100,
senderReceiverPairs = 5))
def test(testSettings: TestSettings): Unit = {
import testSettings._
val receiverName = testName + "-rcv"
runOn(second) {
val rep = reporter(testName)
for (n 1 to senderReceiverPairs) {
val receiver = system.actorOf(receiverProps(rep, payloadSize), receiverName + n)
}
enterBarrier(receiverName + "-started")
enterBarrier(testName + "-done")
rep.halt()
}
runOn(first) {
enterBarrier(receiverName + "-started")
val ignore = TestProbe()
val senders = for (n 1 to senderReceiverPairs) yield {
val receiver = identifyReceiver(receiverName + n)
val plotProbe = TestProbe()
val snd = system.actorOf(senderProps(receiver, testSettings, plotProbe.ref),
testName + "-snd" + n)
val terminationProbe = TestProbe()
terminationProbe.watch(snd)
snd ! Run
(snd, terminationProbe, plotProbe)
}
senders.foreach {
case (snd, terminationProbe, plotProbe)
if (snd == senders.head._1) {
terminationProbe.expectTerminated(snd, barrierTimeout)
val plotResult = plotProbe.expectMsgType[PlotResult]
plot = plot.addAll(plotResult)
} else
terminationProbe.expectTerminated(snd, 10.seconds)
}
enterBarrier(testName + "-done")
}
enterBarrier("after-" + testName)
}
"Max throughput of Artery" must {
for (s scenarios) {
s"be great for ${s.testName}, burstSize = ${s.burstSize}, payloadSize = ${s.payloadSize}" in test(s)
}
// TODO add more tests, such as 5-to-5 sender receiver pairs
}
}

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
final case class PlotResult(values: Vector[(String, Number)] = Vector.empty) {
def add(key: String, value: Number): PlotResult =
copy(values = values :+ (key -> value))
def addAll(p: PlotResult): PlotResult =
copy(values ++ p.values)
def csvLabels: String = values.map(_._1).mkString("\"", "\",\"", "\"")
def csvValues: String = values.map(_._2).mkString("\"", "\",\"", "\"")
// this can be split to two lines with bash: cut -d':' -f2,3 | tr ':' $'\n'
def csv(name: String): String = s"PLOT_${name}:${csvLabels}:${csvValues}"
}
final case class LatencyPlots(plot50: PlotResult = PlotResult(), plot90: PlotResult = PlotResult(), plot99: PlotResult = PlotResult())

View file

@ -75,7 +75,13 @@ akka {
artery {
enabled = off
port = 20200
hostname = localhost
# The hostname or ip clients should connect to.
# InetAddress.getLocalHost.getHostAddress is used if empty or
# "<getHostAddress>" is specified.
# InetAddress.getLocalHost.getHostName is used if
# "<getHostName>" is specified.
hostname = "<getHostAddress>"
}
### General settings

View file

@ -15,6 +15,7 @@ import akka.event.Logging
import akka.event.Logging.LogLevel
import akka.ConfigurationException
import java.util.Locale
import java.net.InetAddress
final class RemoteSettings(val config: Config) {
import config._
@ -22,7 +23,11 @@ final class RemoteSettings(val config: Config) {
val EnableArtery: Boolean = getBoolean("akka.remote.artery.enabled")
val ArteryPort: Int = getInt("akka.remote.artery.port")
val ArteryHostname: String = getString("akka.remote.artery.hostname")
val ArteryHostname: String = getString("akka.remote.artery.hostname") match {
case "" | "<getHostAddress>" InetAddress.getLocalHost.getHostAddress
case "<getHostName>" InetAddress.getLocalHost.getHostName
case other other
}
val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages")

View file

@ -25,7 +25,7 @@ object AeronSink {
/**
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
*/
class AeronSink(channel: String, aeron: () Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] {
class AeronSink(channel: String, aeron: Aeron) extends GraphStage[SinkShape[AeronSink.Bytes]] {
import AeronSink._
val in: Inlet[Bytes] = Inlet("AeronSink")
@ -36,10 +36,10 @@ class AeronSink(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SinkSha
private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(128 * 1024))
private val streamId = 10
private val pub = aeron().addPublication(channel, streamId)
private val pub = aeron.addPublication(channel, streamId)
private val idleStrategy = new BackoffIdleStrategy(
100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100))
private val retries = 120
private val retries = 130
private var backoffCount = retries
private var lastMsgSize = 0
@ -68,13 +68,12 @@ class AeronSink(channel: String, aeron: () ⇒ Aeron) extends GraphStage[SinkSha
if (backoffCount == 1) {
println(s"# drop") // FIXME
pull(in) // drop it
} else if (backoffCount <= 5) {
} else if (backoffCount <= 15) {
// TODO Instead of using the scheduler we should handoff the task of
// retrying/polling to a separate thread that performs the polling for
// all sources/sinks and notifies back when there is some news.
// println(s"# scheduled backoff ${6 - backoffCount}") // FIXME
backoffCount -= 1
if (backoffCount <= 2)
if (backoffCount <= 10)
scheduleOnce(Backoff, 50.millis)
else
scheduleOnce(Backoff, 1.millis)

View file

@ -15,6 +15,7 @@ import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import io.aeron.Aeron
import io.aeron.FragmentAssembler
import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import org.agrona.DirectBuffer
@ -29,7 +30,7 @@ object AeronSource {
/**
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
*/
class AeronSource(channel: String, aeron: () Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] {
class AeronSource(channel: String, aeron: Aeron) extends GraphStage[SourceShape[AeronSource.Bytes]] {
import AeronSource._
val out: Outlet[Bytes] = Outlet("AeronSource")
@ -38,26 +39,30 @@ class AeronSource(channel: String, aeron: () ⇒ Aeron) extends GraphStage[Sourc
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with OutHandler {
private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(256))
private val buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(128 * 1024))
private val streamId = 10
private val sub = aeron().addSubscription(channel, streamId)
private val sub = aeron.addSubscription(channel, streamId)
private val running = new AtomicBoolean(true)
private val spinning = 20000
private val yielding = 0
private val parking = 50
private val idleStrategy = new BackoffIdleStrategy(
100, 10, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100))
private val retries = 115
private var backoffCount = retries
spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100))
private val idleStrategyRetries = spinning + yielding + parking
private var backoffCount = idleStrategyRetries
private val backoffDuration1 = 1.millis
private val backoffDuration2 = 50.millis
private var messageReceived = false
val receiveMessage = getAsyncCallback[Bytes] { data
push(out, data)
}
val fragmentHandler: FragmentHandler = new FragmentHandler {
// the fragmentHandler is called from `poll` in same thread, i.e. no async callback is needed
val fragmentHandler = new FragmentAssembler(new FragmentHandler {
override def onFragment(buffer: DirectBuffer, offset: Int, length: Int, header: Header): Unit = {
messageReceived = true
val data = Array.ofDim[Byte](length)
buffer.getBytes(offset, data);
receiveMessage.invoke(data)
push(out, data)
}
}
})
override def postStop(): Unit = {
running.set(false)
@ -67,29 +72,31 @@ class AeronSource(channel: String, aeron: () ⇒ Aeron) extends GraphStage[Sourc
// OutHandler
override def onPull(): Unit = {
idleStrategy.reset()
backoffCount = retries
backoffCount = idleStrategyRetries
subscriberLoop()
}
@tailrec private def subscriberLoop(): Unit =
if (running.get) {
messageReceived = false // will be set by the fragmentHandler if got full msg
// we only poll 1 fragment, otherwise we would have to use another buffer for
// received messages that can't be pushed
val fragmentsRead = sub.poll(fragmentHandler, 1)
if (fragmentsRead <= 0) {
if (fragmentsRead > 0 && !messageReceived)
subscriberLoop() // recursive, read more fragments
else if (fragmentsRead <= 0) {
// TODO the backoff strategy should be measured and tuned
if (backoffCount <= 0) {
backoffCount -= 1
if (backoffCount > 0) {
idleStrategy.idle()
subscriberLoop() // recursive
} else if (backoffCount > -1000) {
// TODO Instead of using the scheduler we should handoff the task of
// retrying/polling to a separate thread that performs the polling for
// all sources/sinks and notifies back when there is some news.
// println(s"# scheduled backoff ${0 - backoffCount + 1}") // FIXME
backoffCount -= 1
if (backoffCount <= -5)
scheduleOnce(Backoff, 50.millis)
else
scheduleOnce(Backoff, 1.millis)
scheduleOnce(Backoff, backoffDuration1)
} else {
idleStrategy.idle()
backoffCount -= 1
subscriberLoop() // recursive
scheduleOnce(Backoff, backoffDuration2)
}
}
}

View file

@ -96,7 +96,7 @@ private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: R
* Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific
* remote address.
*/
private[remote] class Association(
private[akka] class Association(
val materializer: Materializer,
val remoteAddress: Address,
val transport: Transport) {
@ -113,7 +113,8 @@ private[remote] class Association(
// Idempotent
def associate(): Unit = {
queue = Source.queue(256, OverflowStrategy.dropBuffer).to(sink).run()(materializer)
if (queue eq null)
queue = Source.queue(256, OverflowStrategy.dropBuffer).to(sink).run()(materializer)
}
}

View file

@ -24,12 +24,17 @@ import akka.event.LoggingAdapter
import akka.event.Logging
import io.aeron.driver.MediaDriver
import io.aeron.Aeron
import org.agrona.ErrorHandler
import io.aeron.AvailableImageHandler
import io.aeron.Image
import io.aeron.UnavailableImageHandler
import io.aeron.exceptions.ConductorServiceTimeoutException
/**
* INTERNAL API
*/
// FIXME: Replace the codec with a custom made, hi-perf one
private[remote] class Transport(
private[akka] class Transport(
val localAddress: Address,
val system: ExtendedActorSystem,
val materializer: Materializer,
@ -45,14 +50,47 @@ private[remote] class Transport(
private val aeron = {
val ctx = new Aeron.Context
ctx.availableImageHandler(new AvailableImageHandler {
override def onAvailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
ctx.unavailableImageHandler(new UnavailableImageHandler {
override def onUnavailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}")
// FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable
}
})
ctx.errorHandler(new ErrorHandler {
override def onError(cause: Throwable): Unit = {
cause match {
case e: ConductorServiceTimeoutException
// Timeout between service calls
log.error(cause, s"Aeron ServiceTimeoutException, ${cause.getMessage}")
case _
log.error(cause, s"Aeron error, ${cause.getMessage}")
}
}
})
// TODO also support external media driver
val driver = MediaDriver.launchEmbedded()
val driverContext = new MediaDriver.Context
// FIXME settings from config
driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.driverTimeoutMs(SECONDS.toNanos(10))
val driver = MediaDriver.launchEmbedded(driverContext)
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
def start(): Unit = {
Source.fromGraph(new AeronSource(inboundChannel, () aeron))
Source.fromGraph(new AeronSource(inboundChannel, aeron))
.async // FIXME use dedicated dispatcher for AeronSource
.map(ByteString.apply) // TODO we should use ByteString all the way
.via(inboundFlow)
.runWith(Sink.ignore)
@ -71,7 +109,7 @@ private[remote] class Transport(
Flow.fromGraph(killSwitch.flow[Send])
.via(encoder)
.map(_.toArray) // TODO we should use ByteString all the way
.to(new AeronSink(outboundChannel, () aeron))
.to(new AeronSink(outboundChannel, aeron))
}
// TODO: Try out parallelized serialization (mapAsync) for performance

View file

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.aeron;
package akka.remote.artery;
import java.util.concurrent.locks.LockSupport;

View file

@ -1,4 +1,4 @@
package akka.aeron
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.ActorSystem
@ -17,10 +17,13 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.atomic.AtomicLongArray
import akka.stream.ThrottleMode
import akka.remote.artery.AeronSink
import akka.remote.artery.AeronSource
import org.agrona.ErrorHandler
import io.aeron.AvailableImageHandler
import io.aeron.UnavailableImageHandler
import io.aeron.Image
import io.aeron.AvailableImageHandler
object AeronStreams {
object AeronStreamsApp {
val channel1 = "aeron:udp?endpoint=localhost:40123"
val channel2 = "aeron:udp?endpoint=localhost:40124"
@ -32,7 +35,27 @@ object AeronStreams {
lazy val aeron = {
val ctx = new Aeron.Context
val driver = MediaDriver.launchEmbedded()
ctx.errorHandler(new ErrorHandler {
override def onError(cause: Throwable) {
println(s"# Aeron onError " + cause) // FIXME
}
})
ctx.availableImageHandler(new AvailableImageHandler {
override def onAvailableImage(img: Image): Unit = {
println(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
ctx.unavailableImageHandler(new UnavailableImageHandler {
override def onUnavailableImage(img: Image): Unit = {
println(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
val driverContext = new MediaDriver.Context
driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.driverTimeoutMs(SECONDS.toNanos(10))
val driver = MediaDriver.launchEmbedded(driverContext)
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
@ -111,12 +134,12 @@ object AeronStreams {
var t0 = System.nanoTime()
var count = 0L
var payloadSize = 0L
Source.fromGraph(new AeronSource(channel1, () => aeron))
.map { bytes =>
Source.fromGraph(new AeronSource(channel1, aeron))
.map { bytes
r.onMessage(1, bytes.length)
bytes
}
.runForeach { bytes =>
.runForeach { bytes
count += 1
if (count == 1) {
t0 = System.nanoTime()
@ -126,7 +149,7 @@ object AeronStreams {
printTotal(throughputN, "receive", t0, payloadSize)
}
}.onFailure {
case e =>
case e
e.printStackTrace
exit(-1)
}
@ -138,30 +161,30 @@ object AeronStreams {
val r = reporter
val t0 = System.nanoTime()
Source(1 to throughputN)
.map { n =>
.map { n
if (n == throughputN) {
exit(0)
printTotal(throughputN, "send", t0, payload.length)
}
n
}
.map { _ =>
.map { _
r.onMessage(1, payload.length)
payload
}
.runWith(new AeronSink(channel1, () => aeron))
.runWith(new AeronSink(channel1, aeron))
}
def runEchoReceiver(): Unit = {
// just echo back on channel2
reporterExecutor.execute(reporter)
val r = reporter
Source.fromGraph(new AeronSource(channel1, () => aeron))
.map { bytes =>
Source.fromGraph(new AeronSource(channel1, aeron))
.map { bytes
r.onMessage(1, bytes.length)
bytes
}
.runWith(new AeronSink(channel2, () => aeron))
.runWith(new AeronSink(channel2, aeron))
}
def runEchoSender(): Unit = {
@ -173,12 +196,12 @@ object AeronStreams {
var repeat = 3
val count = new AtomicInteger
var t0 = System.nanoTime()
Source.fromGraph(new AeronSource(channel2, () => aeron))
.map { bytes =>
Source.fromGraph(new AeronSource(channel2, aeron))
.map { bytes
r.onMessage(1, bytes.length)
bytes
}
.runForeach { bytes =>
.runForeach { bytes
val c = count.incrementAndGet()
val d = System.nanoTime() - sendTimes.get(c - 1)
if (c % (latencyN / 10) == 0)
@ -189,7 +212,7 @@ object AeronStreams {
barrier.await() // this is always the last party
}
}.onFailure {
case e =>
case e
e.printStackTrace
exit(-1)
}
@ -202,13 +225,13 @@ object AeronStreams {
Source(1 to latencyN)
.throttle(latencyRate, 1.second, latencyRate / 10, ThrottleMode.Shaping)
.map { n =>
.map { n
if (n % (latencyN / 10) == 0)
println(s"# send offset $n") // FIXME
sendTimes.set(n - 1, System.nanoTime())
payload
}
.runWith(new AeronSink(channel1, () => aeron))
.runWith(new AeronSink(channel1, aeron))
barrier.await()
}
@ -218,12 +241,12 @@ object AeronStreams {
def runDebugReceiver(): Unit = {
import system.dispatcher
Source.fromGraph(new AeronSource(channel1, () => aeron))
.map(bytes => new String(bytes, "utf-8"))
.runForeach { s =>
Source.fromGraph(new AeronSource(channel1, aeron))
.map(bytes new String(bytes, "utf-8"))
.runForeach { s
println(s)
}.onFailure {
case e =>
case e
e.printStackTrace
exit(-1)
}
@ -233,12 +256,12 @@ object AeronStreams {
val fill = "0000"
Source(1 to 1000)
.throttle(1, 1.second, 1, ThrottleMode.Shaping)
.map { n =>
.map { n
val s = (fill + n.toString).takeRight(4)
println(s)
s.getBytes("utf-8")
}
.runWith(new AeronSink(channel1, () => aeron))
.runWith(new AeronSink(channel1, aeron))
}
}

View file

@ -1,5 +1,6 @@
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.{ Actor, ActorIdentity, ActorSystem, Deploy, ExtendedActorSystem, Identify, Props, RootActorPath }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.ConfigFactory
@ -12,6 +13,7 @@ object RemoteSendConsistencySpec {
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.artery.enabled = on
remote.artery.hostname = localhost
}
"""
@ -63,8 +65,8 @@ class RemoteSendConsistencySpec extends AkkaSpec(commonConfig) with ImplicitSend
}
val senderProps = Props(new Actor {
var counter = 1000
remoteRef ! 1000
var counter = 100 // FIXME try this test with 1000, why does it take so long?
remoteRef ! counter
override def receive: Receive = {
case i: Int
@ -84,10 +86,12 @@ class RemoteSendConsistencySpec extends AkkaSpec(commonConfig) with ImplicitSend
system.actorOf(senderProps)
system.actorOf(senderProps)
expectMsg("success")
expectMsg("success")
expectMsg("success")
expectMsg("success")
within(10.seconds) {
expectMsg("success")
expectMsg("success")
expectMsg("success")
expectMsg("success")
}
}
}

View file

@ -41,8 +41,8 @@ private[akka] class HdrHistogram(
private def wrapHistogramOutOfBoundsException(value: Long, ex: ArrayIndexOutOfBoundsException): IllegalArgumentException =
new IllegalArgumentException(s"Given value $value can not be stored in this histogram " +
s"(min: ${hist.getLowestTrackableValue}, max: ${hist.getHighestTrackableValue}})", ex)
s"(min: ${hist.getLowestDiscernibleValue}, max: ${hist.getHighestTrackableValue}})", ex)
def getData = hist.copy().getHistogramData
def getData = hist.copy()
}

View file

@ -132,7 +132,7 @@ object AkkaBuild extends Build {
lazy val remoteTests = Project(
id = "akka-remote-tests",
base = file("akka-remote-tests"),
dependencies = Seq(actorTests % "test->test", multiNodeTestkit)
dependencies = Seq(actorTests % "test->test", remote % "test->test", multiNodeTestkit)
) configs (MultiJvm)
lazy val cluster = Project(

View file

@ -69,7 +69,6 @@ object Dependencies {
val aeronDriver = "io.aeron" % "aeron-driver" % "0.9.5" // ApacheV2
val aeronClient = "io.aeron" % "aeron-client" % "0.9.5" // ApacheV2
val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" // CC0
object Docs {
val sprayJson = "io.spray" %% "spray-json" % "1.3.2" % "test"
@ -96,7 +95,7 @@ object Dependencies {
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.2" % "test" // ApacheV2
val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.2" % "test" // ApacheV2
val latencyUtils = "org.latencyutils" % "LatencyUtils" % "1.0.3" % "test" // Free BSD
val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.1.4" % "test" // CC0
val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.8" % "test" // CC0
val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram)
// sigar logging
@ -165,7 +164,7 @@ object Dependencies {
val contrib = l ++= Seq(Test.junitIntf, Test.commonsIo)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, hdrHistogram)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative)
// akka stream & http