From 9f3c6a57423c145d1492c69364b59c501155dff2 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Tue, 28 Apr 2020 14:11:25 +0100 Subject: [PATCH] Log demand and nr of messages sent in delivery throughput sharding test We can't really turn on any real logs due to the throughput here. Refs #28972 --- .../delivery/DeliveryThroughputSpec.scala | 67 ++++++++++++------- 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala index 9995042c79..c0ff1853e6 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala @@ -37,6 +37,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.PerfFlamesSupport import akka.serialization.jackson.CborSerializable +import akka.actor.typed.scaladsl.LoggerOps object DeliveryThroughputSpec extends MultiNodeConfig { val first = role("first") @@ -232,6 +233,7 @@ object DeliveryThroughputSpec extends MultiNodeConfig { case object Run extends Command private case class WrappedRequestNext(r: ShardingProducerController.RequestNext[Consumer.Command]) extends Command + private case object PrintStatus extends Command def apply( producerController: ActorRef[ShardingProducerController.Command[Consumer.Command]], @@ -240,35 +242,54 @@ object DeliveryThroughputSpec extends MultiNodeConfig { resultReporter: BenchmarkFileReporter): Behavior[Command] = { val numberOfMessages = testSettings.totalMessages - Behaviors.setup { context => - val requestNextAdapter = - context.messageAdapter[ShardingProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_)) - var startTime = System.nanoTime() - var remaining = numberOfMessages + context.system.settings.config - .getInt("akka.reliable-delivery.sharding.consumer-controller.flow-control-window") + Behaviors.withTimers { timers => + Behaviors.setup { context => + timers.startTimerWithFixedDelay(PrintStatus, 1.second) + val requestNextAdapter = + context.messageAdapter[ShardingProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_)) + var startTime = System.nanoTime() + var remaining = numberOfMessages + context.system.settings.config + .getInt("akka.reliable-delivery.sharding.consumer-controller.flow-control-window") + var latestDemand: ShardingProducerController.RequestNext[Consumer.Command] = null + var messagesSentToEachEntity: Map[String, Long] = Map.empty[String, Long].withDefaultValue(0L) - Behaviors.receiveMessage { - case WrappedRequestNext(next) => - remaining -= 1 - if (remaining == 0) { - context.log.info("Completed {} messages", numberOfMessages) - Producer.reportEnd(startTime, testSettings, plotRef, resultReporter) - Behaviors.stopped - } else { - val entityId = (remaining % testSettings.numberOfConsumers).toString - if (next.entitiesWithDemand(entityId) || !next.bufferedForEntitiesWithoutDemand.contains(entityId)) - next.sendNextTo ! ShardingEnvelope(entityId, Consumer.TheMessage) + Behaviors.receiveMessage { + case WrappedRequestNext(next) => + latestDemand = next + remaining -= 1 + if (remaining == 0) { + context.log.info("Completed {} messages", numberOfMessages) + Producer.reportEnd(startTime, testSettings, plotRef, resultReporter) + Behaviors.stopped + } else { + val entityId = (remaining % testSettings.numberOfConsumers).toString + if (next.entitiesWithDemand(entityId) || !next.bufferedForEntitiesWithoutDemand.contains(entityId)) { + messagesSentToEachEntity = + messagesSentToEachEntity.updated(entityId, messagesSentToEachEntity(entityId) + 1L) + + next.sendNextTo ! ShardingEnvelope(entityId, Consumer.TheMessage) + } + Behaviors.same + } + case Run => + context.log.info("Starting {} messages", numberOfMessages) + startTime = System.nanoTime() + producerController ! ShardingProducerController.Start(requestNextAdapter) Behaviors.same - } - case Run => - context.log.info("Starting {} messages", numberOfMessages) - startTime = System.nanoTime() - producerController ! ShardingProducerController.Start(requestNextAdapter) - Behaviors.same + case PrintStatus => + context.log.infoN( + "Remaining {}. Latest demand {}. Messages sent {}. Expecting demand from {}", + remaining, + latestDemand, + messagesSentToEachEntity, + (remaining % testSettings.numberOfConsumers)) + Behaviors.same + } } } } + } final case class TestSettings(testName: String, totalMessages: Long, numberOfConsumers: Int)