Merge pull request #28989 from chbatey/issue-38972

This commit is contained in:
Renato Cavalcanti 2020-05-06 13:53:54 +02:00 committed by GitHub
commit f04559de66
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -37,6 +37,7 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.PerfFlamesSupport
import akka.serialization.jackson.CborSerializable
import akka.actor.typed.scaladsl.LoggerOps
object DeliveryThroughputSpec extends MultiNodeConfig {
val first = role("first")
@ -232,6 +233,7 @@ object DeliveryThroughputSpec extends MultiNodeConfig {
case object Run extends Command
private case class WrappedRequestNext(r: ShardingProducerController.RequestNext[Consumer.Command]) extends Command
private case object PrintStatus extends Command
def apply(
producerController: ActorRef[ShardingProducerController.Command[Consumer.Command]],
@ -240,35 +242,54 @@ object DeliveryThroughputSpec extends MultiNodeConfig {
resultReporter: BenchmarkFileReporter): Behavior[Command] = {
val numberOfMessages = testSettings.totalMessages
Behaviors.setup { context =>
val requestNextAdapter =
context.messageAdapter[ShardingProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_))
var startTime = System.nanoTime()
var remaining = numberOfMessages + context.system.settings.config
.getInt("akka.reliable-delivery.sharding.consumer-controller.flow-control-window")
Behaviors.withTimers { timers =>
Behaviors.setup { context =>
timers.startTimerWithFixedDelay(PrintStatus, 1.second)
val requestNextAdapter =
context.messageAdapter[ShardingProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_))
var startTime = System.nanoTime()
var remaining = numberOfMessages + context.system.settings.config
.getInt("akka.reliable-delivery.sharding.consumer-controller.flow-control-window")
var latestDemand: ShardingProducerController.RequestNext[Consumer.Command] = null
var messagesSentToEachEntity: Map[String, Long] = Map.empty[String, Long].withDefaultValue(0L)
Behaviors.receiveMessage {
case WrappedRequestNext(next) =>
remaining -= 1
if (remaining == 0) {
context.log.info("Completed {} messages", numberOfMessages)
Producer.reportEnd(startTime, testSettings, plotRef, resultReporter)
Behaviors.stopped
} else {
val entityId = (remaining % testSettings.numberOfConsumers).toString
if (next.entitiesWithDemand(entityId) || !next.bufferedForEntitiesWithoutDemand.contains(entityId))
next.sendNextTo ! ShardingEnvelope(entityId, Consumer.TheMessage)
Behaviors.receiveMessage {
case WrappedRequestNext(next) =>
latestDemand = next
remaining -= 1
if (remaining == 0) {
context.log.info("Completed {} messages", numberOfMessages)
Producer.reportEnd(startTime, testSettings, plotRef, resultReporter)
Behaviors.stopped
} else {
val entityId = (remaining % testSettings.numberOfConsumers).toString
if (next.entitiesWithDemand(entityId) || !next.bufferedForEntitiesWithoutDemand.contains(entityId)) {
messagesSentToEachEntity =
messagesSentToEachEntity.updated(entityId, messagesSentToEachEntity(entityId) + 1L)
next.sendNextTo ! ShardingEnvelope(entityId, Consumer.TheMessage)
}
Behaviors.same
}
case Run =>
context.log.info("Starting {} messages", numberOfMessages)
startTime = System.nanoTime()
producerController ! ShardingProducerController.Start(requestNextAdapter)
Behaviors.same
}
case Run =>
context.log.info("Starting {} messages", numberOfMessages)
startTime = System.nanoTime()
producerController ! ShardingProducerController.Start(requestNextAdapter)
Behaviors.same
case PrintStatus =>
context.log.infoN(
"Remaining {}. Latest demand {}. Messages sent {}. Expecting demand from {}",
remaining,
latestDemand,
messagesSentToEachEntity,
(remaining % testSettings.numberOfConsumers))
Behaviors.same
}
}
}
}
}
final case class TestSettings(testName: String, totalMessages: Long, numberOfConsumers: Int)