=rem #21365 enable multiple lanes in MaxThroughputSpec

This needed the other change for each sender to send to all of the target
actors. Otherwise, large batches of messages to the same target actor would
limit the potential of actually doing work in parallel with multiple lanes due
to head-of-line blocking.
This commit is contained in:
Johannes Rudolph 2016-12-30 12:52:42 +01:00
parent 35feef8d01
commit e66cb028b0

View file

@ -70,52 +70,80 @@ object MaxThroughputSpec extends MultiNodeConfig {
actor-refs.advertisement-interval = 2 second
manifests.advertisement-interval = 2 second
}
advanced {
inbound-lanes = 2
# buffer-pool-size = 512
}
}
}
akka.remote.default-remote-dispatcher {
fork-join-executor {
# parallelism-factor = 0.5
parallelism-min = 2
parallelism-max = 2
}
# Set to 10 by default. Might be worthwhile to experiment with.
# throughput = 100
}
""")).withFallback(RemotingMultiNodeSpec.commonConfig))
case object Run
sealed trait Echo extends DeadLetterSuppression with JavaSerializable
final case object Start extends Echo
final case class Start(correspondingReceiver: ActorRef) extends Echo
final case object End extends Echo
final case class Warmup(msg: AnyRef)
final case class EndResult(totalReceived: Long) extends JavaSerializable
final case class FlowControl(burstStartTime: Long) extends Echo
def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean): Props =
Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics)).withDispatcher("akka.remote.default-remote-dispatcher")
def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int): Props =
Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics, numSenders)).withDispatcher("akka.remote.default-remote-dispatcher")
class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean) extends Actor {
class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int) extends Actor {
private var c = 0L
private val taskRunnerMetrics = new TaskRunnerMetrics(context.system)
private var endMessagesMissing = numSenders
private var correspondingSender: ActorRef = null // the Actor which send the Start message will also receive the report
def receive = {
case msg: Array[Byte]
if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message")
reporter.onMessage(1, payloadSize)
c += 1
report()
case msg: TestMessage
reporter.onMessage(1, payloadSize)
c += 1
case Start
c = 0
report()
case Start(corresponding)
if (corresponding == self) correspondingSender = sender()
sender() ! Start
case End if endMessagesMissing > 1
endMessagesMissing -= 1 // wait for End message from all senders
case End
if (printTaskRunnerMetrics)
taskRunnerMetrics.printHistograms()
sender() ! EndResult(c)
correspondingSender ! EndResult(c)
context.stop(self)
case m: Echo
sender() ! m
}
def report(): Unit = {
reporter.onMessage(1, payloadSize)
c += 1
}
}
def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef,
def senderProps(mainTarget: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef,
printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter): Props =
Props(new Sender(target, testSettings, plotRef, printTaskRunnerMetrics, reporter))
Props(new Sender(mainTarget, targets, testSettings, plotRef, printTaskRunnerMetrics, reporter))
class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter)
class Sender(target: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter)
extends Actor {
val numTargets = targets.size
import testSettings._
val payload = ("0" * testSettings.payloadSize).getBytes("utf-8")
var startTime = 0L
@ -132,50 +160,59 @@ object MaxThroughputSpec extends MultiNodeConfig {
def receive = {
case Run
if (compressionEnabled) {
target ! payload
target ! Warmup(payload)
context.setReceiveTimeout(1.second)
context.become(waitingForCompression)
} else {
sendBatch() // first some warmup
target ! Start // then Start, which will echo back here
context.become(active)
}
} else runWarmup()
}
def waitingForCompression: Receive = {
case ReceivedActorRefCompressionTable(_, table)
if (table.dictionary.contains(target)) {
sendBatch() // first some warmup
target ! Start // then Start, which will echo back here
context.setReceiveTimeout(Duration.Undefined)
context.become(active)
runWarmup()
} else
target ! payload
target ! Warmup(payload)
case ReceiveTimeout
target ! payload
target ! Warmup(payload)
}
def active: Receive = {
def runWarmup(): Unit = {
sendBatch(warmup = true) // first some warmup
targets.foreach(_ ! Start(target)) // then Start, which will echo back here
context.become(warmup)
}
def warmup: Receive = {
case Start
println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " +
s"$burstSize and payload size $payloadSize")
startTime = System.nanoTime
remaining = totalMessages
(0 until sent.size).foreach(i sent(i) = 0)
// have a few batches in flight to make sure there are always messages to send
(1 to 3).foreach { _
val t0 = System.nanoTime()
sendBatch()
sendBatch(warmup = false)
sendFlowControl(t0)
}
context.become(active)
case _: Warmup
}
def active: Receive = {
case c @ FlowControl(t0)
val now = System.nanoTime()
val duration = NANOSECONDS.toMillis(now - t0)
maxRoundTripMillis = math.max(maxRoundTripMillis, duration)
sendBatch()
sendBatch(warmup = false)
sendFlowControl(now)
}
val waitingForEndResult: Receive = {
case EndResult(totalReceived)
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
val throughput = (totalReceived * 1000.0 / took)
@ -191,7 +228,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
s"burst size $burstSize, " +
s"payload size $payloadSize, " +
s"total size ${totalSize(context.system)}, " +
s"$took ms to deliver $totalReceived messages")
s"$took ms to deliver $totalReceived messages.")
if (printTaskRunnerMetrics)
taskRunnerMetrics.printHistograms()
@ -202,11 +239,12 @@ object MaxThroughputSpec extends MultiNodeConfig {
case c: ReceivedActorRefCompressionTable
}
def sendBatch(): Unit = {
val sent = new Array[Long](targets.size)
def sendBatch(warmup: Boolean): Unit = {
val batchSize = math.min(remaining, burstSize)
var i = 0
while (i < batchSize) {
val msg =
val msg0 =
if (realMessage)
TestMessage(
id = totalMessages - remaining + i,
@ -217,17 +255,20 @@ object MaxThroughputSpec extends MultiNodeConfig {
items = Vector(TestMessage.Item(1, "A"), TestMessage.Item(2, "B")))
else payload
// target ! msg
target.tell(msg, ActorRef.noSender)
val msg1 = if (warmup) Warmup(msg0) else msg0
targets(i % numTargets).tell(msg1, ActorRef.noSender)
sent(i % numTargets) += 1
i += 1
}
remaining -= batchSize
}
def sendFlowControl(t0: Long): Unit = {
if (remaining <= 0)
target ! End
else
if (remaining <= 0) {
context.become(waitingForEndResult)
targets.foreach(_ ! End)
} else
target ! FlowControl(t0)
}
}
@ -315,7 +356,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec
def identifyReceiver(name: String, r: RoleName = second): ActorRef = {
system.actorSelection(node(r) / "user" / name) ! Identify(None)
expectMsgType[ActorIdentity].ref.get
expectMsgType[ActorIdentity](10.seconds).ref.get
}
val scenarios = List(
@ -365,7 +406,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec
val rep = reporter(testName)
for (n 1 to senderReceiverPairs) {
val receiver = system.actorOf(
receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1),
receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1, senderReceiverPairs),
receiverName + n)
}
enterBarrier(receiverName + "-started")
@ -376,11 +417,12 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec
runOn(first) {
enterBarrier(receiverName + "-started")
val ignore = TestProbe()
val receivers = (for (n 1 to senderReceiverPairs) yield identifyReceiver(receiverName + n)).toArray
val senders = for (n 1 to senderReceiverPairs) yield {
val receiver = identifyReceiver(receiverName + n)
val receiver = receivers(n - 1)
val plotProbe = TestProbe()
val snd = system.actorOf(
senderProps(receiver, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1, resultReporter),
senderProps(receiver, receivers, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1, resultReporter),
testName + "-snd" + n)
val terminationProbe = TestProbe()
terminationProbe.watch(snd)