diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 51605c6bad..a4a8aedb74 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -70,52 +70,80 @@ object MaxThroughputSpec extends MultiNodeConfig { actor-refs.advertisement-interval = 2 second manifests.advertisement-interval = 2 second } + + advanced { + inbound-lanes = 2 + # buffer-pool-size = 512 + } } } + akka.remote.default-remote-dispatcher { + fork-join-executor { + # parallelism-factor = 0.5 + parallelism-min = 2 + parallelism-max = 2 + } + # Set to 10 by default. Might be worthwhile to experiment with. + # throughput = 100 + } """)).withFallback(RemotingMultiNodeSpec.commonConfig)) case object Run sealed trait Echo extends DeadLetterSuppression with JavaSerializable - final case object Start extends Echo + final case class Start(correspondingReceiver: ActorRef) extends Echo final case object End extends Echo + final case class Warmup(msg: AnyRef) final case class EndResult(totalReceived: Long) extends JavaSerializable final case class FlowControl(burstStartTime: Long) extends Echo - def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean): Props = - Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics)).withDispatcher("akka.remote.default-remote-dispatcher") + def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int): Props = + Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics, numSenders)).withDispatcher("akka.remote.default-remote-dispatcher") - class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean) extends Actor { + class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int) extends Actor { private var c = 0L private val taskRunnerMetrics = new TaskRunnerMetrics(context.system) + private var endMessagesMissing = numSenders + private var correspondingSender: ActorRef = null // the Actor which send the Start message will also receive the report def receive = { case msg: Array[Byte] ⇒ if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message") - reporter.onMessage(1, payloadSize) - c += 1 + + report() case msg: TestMessage ⇒ - reporter.onMessage(1, payloadSize) - c += 1 - case Start ⇒ - c = 0 + report() + + case Start(corresponding) ⇒ + if (corresponding == self) correspondingSender = sender() sender() ! Start + + case End if endMessagesMissing > 1 ⇒ + endMessagesMissing -= 1 // wait for End message from all senders + case End ⇒ if (printTaskRunnerMetrics) taskRunnerMetrics.printHistograms() - sender() ! EndResult(c) + correspondingSender ! EndResult(c) context.stop(self) + case m: Echo ⇒ sender() ! m + } + def report(): Unit = { + reporter.onMessage(1, payloadSize) + c += 1 } } - def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, + def senderProps(mainTarget: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter): Props = - Props(new Sender(target, testSettings, plotRef, printTaskRunnerMetrics, reporter)) + Props(new Sender(mainTarget, targets, testSettings, plotRef, printTaskRunnerMetrics, reporter)) - class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) + class Sender(target: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) extends Actor { + val numTargets = targets.size + import testSettings._ val payload = ("0" * testSettings.payloadSize).getBytes("utf-8") var startTime = 0L @@ -132,50 +160,59 @@ object MaxThroughputSpec extends MultiNodeConfig { def receive = { case Run ⇒ if (compressionEnabled) { - target ! payload + target ! Warmup(payload) context.setReceiveTimeout(1.second) context.become(waitingForCompression) - } else { - sendBatch() // first some warmup - target ! Start // then Start, which will echo back here - context.become(active) - } + } else runWarmup() } def waitingForCompression: Receive = { case ReceivedActorRefCompressionTable(_, table) ⇒ if (table.dictionary.contains(target)) { - sendBatch() // first some warmup - target ! Start // then Start, which will echo back here context.setReceiveTimeout(Duration.Undefined) - context.become(active) + runWarmup() } else - target ! payload + target ! Warmup(payload) case ReceiveTimeout ⇒ - target ! payload + target ! Warmup(payload) } - def active: Receive = { + def runWarmup(): Unit = { + sendBatch(warmup = true) // first some warmup + targets.foreach(_ ! Start(target)) // then Start, which will echo back here + context.become(warmup) + } + + def warmup: Receive = { case Start ⇒ println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " + s"$burstSize and payload size $payloadSize") startTime = System.nanoTime remaining = totalMessages + (0 until sent.size).foreach(i ⇒ sent(i) = 0) // have a few batches in flight to make sure there are always messages to send (1 to 3).foreach { _ ⇒ val t0 = System.nanoTime() - sendBatch() + sendBatch(warmup = false) sendFlowControl(t0) } + context.become(active) + + case _: Warmup ⇒ + } + + def active: Receive = { case c @ FlowControl(t0) ⇒ val now = System.nanoTime() val duration = NANOSECONDS.toMillis(now - t0) maxRoundTripMillis = math.max(maxRoundTripMillis, duration) - sendBatch() + sendBatch(warmup = false) sendFlowControl(now) + } + val waitingForEndResult: Receive = { case EndResult(totalReceived) ⇒ val took = NANOSECONDS.toMillis(System.nanoTime - startTime) val throughput = (totalReceived * 1000.0 / took) @@ -191,7 +228,7 @@ object MaxThroughputSpec extends MultiNodeConfig { s"burst size $burstSize, " + s"payload size $payloadSize, " + s"total size ${totalSize(context.system)}, " + - s"$took ms to deliver $totalReceived messages") + s"$took ms to deliver $totalReceived messages.") if (printTaskRunnerMetrics) taskRunnerMetrics.printHistograms() @@ -202,11 +239,12 @@ object MaxThroughputSpec extends MultiNodeConfig { case c: ReceivedActorRefCompressionTable ⇒ } - def sendBatch(): Unit = { + val sent = new Array[Long](targets.size) + def sendBatch(warmup: Boolean): Unit = { val batchSize = math.min(remaining, burstSize) var i = 0 while (i < batchSize) { - val msg = + val msg0 = if (realMessage) TestMessage( id = totalMessages - remaining + i, @@ -217,17 +255,20 @@ object MaxThroughputSpec extends MultiNodeConfig { items = Vector(TestMessage.Item(1, "A"), TestMessage.Item(2, "B"))) else payload - // target ! msg - target.tell(msg, ActorRef.noSender) + val msg1 = if (warmup) Warmup(msg0) else msg0 + + targets(i % numTargets).tell(msg1, ActorRef.noSender) + sent(i % numTargets) += 1 i += 1 } remaining -= batchSize } def sendFlowControl(t0: Long): Unit = { - if (remaining <= 0) - target ! End - else + if (remaining <= 0) { + context.become(waitingForEndResult) + targets.foreach(_ ! End) + } else target ! FlowControl(t0) } } @@ -315,7 +356,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec def identifyReceiver(name: String, r: RoleName = second): ActorRef = { system.actorSelection(node(r) / "user" / name) ! Identify(None) - expectMsgType[ActorIdentity].ref.get + expectMsgType[ActorIdentity](10.seconds).ref.get } val scenarios = List( @@ -365,7 +406,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec val rep = reporter(testName) for (n ← 1 to senderReceiverPairs) { val receiver = system.actorOf( - receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1), + receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1, senderReceiverPairs), receiverName + n) } enterBarrier(receiverName + "-started") @@ -376,11 +417,12 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec runOn(first) { enterBarrier(receiverName + "-started") val ignore = TestProbe() + val receivers = (for (n ← 1 to senderReceiverPairs) yield identifyReceiver(receiverName + n)).toArray val senders = for (n ← 1 to senderReceiverPairs) yield { - val receiver = identifyReceiver(receiverName + n) + val receiver = receivers(n - 1) val plotProbe = TestProbe() val snd = system.actorOf( - senderProps(receiver, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1, resultReporter), + senderProps(receiver, receivers, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1, resultReporter), testName + "-snd" + n) val terminationProbe = TestProbe() terminationProbe.watch(snd)