From e66cb028b025c5e6da36ffc674ccccc1d60bfde9 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 30 Dec 2016 12:52:42 +0100 Subject: [PATCH] =rem #21365 enable multiple lanes in MaxThroughputSpec This needed the other change for each sender to send to all of the target actors. Otherwise, large batches of messages to the same target actor would limit the potential of actually doing work in parallel with multiple lanes due to head-of-line blocking. --- .../remote/artery/MaxThroughputSpec.scala | 122 ++++++++++++------ 1 file changed, 82 insertions(+), 40 deletions(-) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 51605c6bad..a4a8aedb74 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -70,52 +70,80 @@ object MaxThroughputSpec extends MultiNodeConfig { actor-refs.advertisement-interval = 2 second manifests.advertisement-interval = 2 second } + + advanced { + inbound-lanes = 2 + # buffer-pool-size = 512 + } } } + akka.remote.default-remote-dispatcher { + fork-join-executor { + # parallelism-factor = 0.5 + parallelism-min = 2 + parallelism-max = 2 + } + # Set to 10 by default. Might be worthwhile to experiment with. + # throughput = 100 + } """)).withFallback(RemotingMultiNodeSpec.commonConfig)) case object Run sealed trait Echo extends DeadLetterSuppression with JavaSerializable - final case object Start extends Echo + final case class Start(correspondingReceiver: ActorRef) extends Echo final case object End extends Echo + final case class Warmup(msg: AnyRef) final case class EndResult(totalReceived: Long) extends JavaSerializable final case class FlowControl(burstStartTime: Long) extends Echo - def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean): Props = - Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics)).withDispatcher("akka.remote.default-remote-dispatcher") + def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int): Props = + Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics, numSenders)).withDispatcher("akka.remote.default-remote-dispatcher") - class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean) extends Actor { + class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int) extends Actor { private var c = 0L private val taskRunnerMetrics = new TaskRunnerMetrics(context.system) + private var endMessagesMissing = numSenders + private var correspondingSender: ActorRef = null // the Actor which send the Start message will also receive the report def receive = { case msg: Array[Byte] ⇒ if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message") - reporter.onMessage(1, payloadSize) - c += 1 + + report() case msg: TestMessage ⇒ - reporter.onMessage(1, payloadSize) - c += 1 - case Start ⇒ - c = 0 + report() + + case Start(corresponding) ⇒ + if (corresponding == self) correspondingSender = sender() sender() ! Start + + case End if endMessagesMissing > 1 ⇒ + endMessagesMissing -= 1 // wait for End message from all senders + case End ⇒ if (printTaskRunnerMetrics) taskRunnerMetrics.printHistograms() - sender() ! EndResult(c) + correspondingSender ! EndResult(c) context.stop(self) + case m: Echo ⇒ sender() ! m + } + def report(): Unit = { + reporter.onMessage(1, payloadSize) + c += 1 } } - def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, + def senderProps(mainTarget: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter): Props = - Props(new Sender(target, testSettings, plotRef, printTaskRunnerMetrics, reporter)) + Props(new Sender(mainTarget, targets, testSettings, plotRef, printTaskRunnerMetrics, reporter)) - class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) + class Sender(target: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) extends Actor { + val numTargets = targets.size + import testSettings._ val payload = ("0" * testSettings.payloadSize).getBytes("utf-8") var startTime = 0L @@ -132,50 +160,59 @@ object MaxThroughputSpec extends MultiNodeConfig { def receive = { case Run ⇒ if (compressionEnabled) { - target ! payload + target ! Warmup(payload) context.setReceiveTimeout(1.second) context.become(waitingForCompression) - } else { - sendBatch() // first some warmup - target ! Start // then Start, which will echo back here - context.become(active) - } + } else runWarmup() } def waitingForCompression: Receive = { case ReceivedActorRefCompressionTable(_, table) ⇒ if (table.dictionary.contains(target)) { - sendBatch() // first some warmup - target ! Start // then Start, which will echo back here context.setReceiveTimeout(Duration.Undefined) - context.become(active) + runWarmup() } else - target ! payload + target ! Warmup(payload) case ReceiveTimeout ⇒ - target ! payload + target ! Warmup(payload) } - def active: Receive = { + def runWarmup(): Unit = { + sendBatch(warmup = true) // first some warmup + targets.foreach(_ ! Start(target)) // then Start, which will echo back here + context.become(warmup) + } + + def warmup: Receive = { case Start ⇒ println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " + s"$burstSize and payload size $payloadSize") startTime = System.nanoTime remaining = totalMessages + (0 until sent.size).foreach(i ⇒ sent(i) = 0) // have a few batches in flight to make sure there are always messages to send (1 to 3).foreach { _ ⇒ val t0 = System.nanoTime() - sendBatch() + sendBatch(warmup = false) sendFlowControl(t0) } + context.become(active) + + case _: Warmup ⇒ + } + + def active: Receive = { case c @ FlowControl(t0) ⇒ val now = System.nanoTime() val duration = NANOSECONDS.toMillis(now - t0) maxRoundTripMillis = math.max(maxRoundTripMillis, duration) - sendBatch() + sendBatch(warmup = false) sendFlowControl(now) + } + val waitingForEndResult: Receive = { case EndResult(totalReceived) ⇒ val took = NANOSECONDS.toMillis(System.nanoTime - startTime) val throughput = (totalReceived * 1000.0 / took) @@ -191,7 +228,7 @@ object MaxThroughputSpec extends MultiNodeConfig { s"burst size $burstSize, " + s"payload size $payloadSize, " + s"total size ${totalSize(context.system)}, " + - s"$took ms to deliver $totalReceived messages") + s"$took ms to deliver $totalReceived messages.") if (printTaskRunnerMetrics) taskRunnerMetrics.printHistograms() @@ -202,11 +239,12 @@ object MaxThroughputSpec extends MultiNodeConfig { case c: ReceivedActorRefCompressionTable ⇒ } - def sendBatch(): Unit = { + val sent = new Array[Long](targets.size) + def sendBatch(warmup: Boolean): Unit = { val batchSize = math.min(remaining, burstSize) var i = 0 while (i < batchSize) { - val msg = + val msg0 = if (realMessage) TestMessage( id = totalMessages - remaining + i, @@ -217,17 +255,20 @@ object MaxThroughputSpec extends MultiNodeConfig { items = Vector(TestMessage.Item(1, "A"), TestMessage.Item(2, "B"))) else payload - // target ! msg - target.tell(msg, ActorRef.noSender) + val msg1 = if (warmup) Warmup(msg0) else msg0 + + targets(i % numTargets).tell(msg1, ActorRef.noSender) + sent(i % numTargets) += 1 i += 1 } remaining -= batchSize } def sendFlowControl(t0: Long): Unit = { - if (remaining <= 0) - target ! End - else + if (remaining <= 0) { + context.become(waitingForEndResult) + targets.foreach(_ ! End) + } else target ! FlowControl(t0) } } @@ -315,7 +356,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec def identifyReceiver(name: String, r: RoleName = second): ActorRef = { system.actorSelection(node(r) / "user" / name) ! Identify(None) - expectMsgType[ActorIdentity].ref.get + expectMsgType[ActorIdentity](10.seconds).ref.get } val scenarios = List( @@ -365,7 +406,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec val rep = reporter(testName) for (n ← 1 to senderReceiverPairs) { val receiver = system.actorOf( - receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1), + receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1, senderReceiverPairs), receiverName + n) } enterBarrier(receiverName + "-started") @@ -376,11 +417,12 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec runOn(first) { enterBarrier(receiverName + "-started") val ignore = TestProbe() + val receivers = (for (n ← 1 to senderReceiverPairs) yield identifyReceiver(receiverName + n)).toArray val senders = for (n ← 1 to senderReceiverPairs) yield { - val receiver = identifyReceiver(receiverName + n) + val receiver = receivers(n - 1) val plotProbe = TestProbe() val snd = system.actorOf( - senderProps(receiver, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1, resultReporter), + senderProps(receiver, receivers, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1, resultReporter), testName + "-snd" + n) val terminationProbe = TestProbe() terminationProbe.watch(snd)