Merge pull request #28874 from akka/wip-28725-delivery-jmh-work-pulling-patriknw

JMH bench for work pulling, 28725
This commit is contained in:
Patrik Nordwall 2020-04-13 19:08:26 +02:00 committed by GitHub
commit d611bb0d68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -9,11 +9,15 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.delivery.ProducerController.MessageWithConfirmation
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import com.typesafe.config.ConfigFactory
@ -24,9 +28,13 @@ object Producer {
case object Run extends Command
private case class WrappedRequestNext(r: ProducerController.RequestNext[Consumer.Command]) extends Command
private case object AskReply extends Command
private implicit val askTimeout: akka.util.Timeout = 5.seconds
def apply(
numberOfMessages: Int,
useAsk: Boolean,
producerController: ActorRef[ProducerController.Command[Consumer.Command]]): Behavior[Command] = {
Behaviors.setup { context =>
val requestNextAdapter =
@ -37,6 +45,14 @@ object Producer {
if (next.confirmedSeqNr >= numberOfMessages) {
context.log.info("Completed {} messages", numberOfMessages)
Behaviors.stopped
} else if (useAsk) {
context.ask[MessageWithConfirmation[Consumer.Command], ProducerController.SeqNr](
next.askNextTo,
askReplyTo => MessageWithConfirmation(Consumer.TheMessage, askReplyTo)) {
case Success(_) => AskReply
case Failure(e) => throw e
}
Behaviors.same
} else {
next.sendNextTo ! Consumer.TheMessage
Behaviors.same
@ -46,6 +62,9 @@ object Producer {
context.log.info("Starting {} messages", numberOfMessages)
producerController ! ProducerController.Start(requestNextAdapter)
Behaviors.same
case AskReply =>
Behaviors.same
}
}
}
@ -58,6 +77,9 @@ object Consumer {
private case class WrappedDelivery(d: ConsumerController.Delivery[Command]) extends Command
def serviceKey(testName: String): ServiceKey[ConsumerController.Command[Command]] =
ServiceKey[ConsumerController.Command[Consumer.Command]](testName)
def apply(consumerController: ActorRef[ConsumerController.Command[Command]]): Behavior[Command] = {
Behaviors.setup { context =>
val deliveryAdapter =
@ -74,33 +96,88 @@ object Consumer {
}
}
object WorkPullingProducer {
trait Command
case object Run extends Command
private case class WrappedRequestNext(r: WorkPullingProducerController.RequestNext[Consumer.Command]) extends Command
def apply(
numberOfMessages: Int,
producerController: ActorRef[WorkPullingProducerController.Command[Consumer.Command]]): Behavior[Command] = {
Behaviors.setup { context =>
val requestNextAdapter =
context.messageAdapter[WorkPullingProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_))
var remaining = numberOfMessages + context.system.settings.config
.getInt("akka.reliable-delivery.consumer-controller.flow-control-window")
Behaviors.receiveMessage {
case WrappedRequestNext(next) =>
remaining -= 1
if (remaining == 0) {
context.log.info("Completed {} messages", numberOfMessages)
Behaviors.stopped
} else {
next.sendNextTo ! Consumer.TheMessage
Behaviors.same
}
case Run =>
context.log.info("Starting {} messages", numberOfMessages)
producerController ! WorkPullingProducerController.Start(requestNextAdapter)
Behaviors.same
}
}
}
}
object Guardian {
trait Command
final case class Run(id: String, numberOfMessages: Int, replyTo: ActorRef[Done]) extends Command
final case class ProducerTerminated(consumer: ActorRef[Consumer.Command], replyTo: ActorRef[Done]) extends Command
final case class RunPointToPoint(id: String, numberOfMessages: Int, useAsk: Boolean, replyTo: ActorRef[Done])
extends Command
final case class RunWorkPulling(id: String, numberOfMessages: Int, workers: Int, replyTo: ActorRef[Done])
extends Command
final case class ProducerTerminated(consumers: List[ActorRef[Consumer.Command]], replyTo: ActorRef[Done])
extends Command
def apply(): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.receiveMessage {
case Run(id, numberOfMessages, replyTo) =>
val consumerController = context.spawn(ConsumerController[Consumer.Command](), s"consumerController-$id")
val consumer = context.spawn(Consumer(consumerController), s"consumer-$id")
case RunPointToPoint(id, numberOfMessages, useAsk, replyTo) =>
// point-to-point
val consumerController =
context.spawn(ConsumerController[Consumer.Command](), s"consumerController-$id")
val consumers = List(context.spawn(Consumer(consumerController), s"consumer-$id"))
val producerController = context.spawn(
ProducerController[Consumer.Command](id, durableQueueBehavior = None),
s"producerController-$id")
val producer = context.spawn(Producer(numberOfMessages, producerController), s"producer-$id")
context.watchWith(producer, ProducerTerminated(consumer, replyTo))
val producer = context.spawn(Producer(numberOfMessages, useAsk, producerController), s"producer-$id")
consumerController ! ConsumerController.RegisterToProducerController(producerController)
context.watchWith(producer, ProducerTerminated(consumers, replyTo))
producer ! Producer.Run
Behaviors.same
case ProducerTerminated(consumer, replyTo) =>
context.stop(consumer)
case RunWorkPulling(id, numberOfMessages, workers, replyTo) =>
// workPulling
val sKey = Consumer.serviceKey(id)
val consumerController =
context.spawn(ConsumerController[Consumer.Command](sKey), s"consumerController-$id")
val consumers = (1 to workers).map { n =>
context.spawn(Consumer(consumerController), s"consumer-$n-$id")
}.toList
val producerController = context.spawn(
WorkPullingProducerController[Consumer.Command](id, sKey, durableQueueBehavior = None),
s"producerController-$id")
val producer = context.spawn(WorkPullingProducer(numberOfMessages, producerController), s"producer-$id")
context.watchWith(producer, ProducerTerminated(consumers, replyTo))
producer ! WorkPullingProducer.Run
Behaviors.same
case ProducerTerminated(consumers, replyTo) =>
consumers.foreach(context.stop)
replyTo ! Done
Behaviors.same
}
@ -122,7 +199,7 @@ object ReliableDeliveryBenchmark {
class ReliableDeliveryBenchmark {
import ReliableDeliveryBenchmark._
@Param(Array("10", "50"))
@Param(Array("50"))
var window = 0
implicit var system: ActorSystem[Guardian.Command] = _
@ -150,8 +227,36 @@ class ReliableDeliveryBenchmark {
@Benchmark
@OperationsPerInvocation(messagesPerOperation)
def echo(): Unit = {
Await.result(system.ask(Guardian.Run(UUID.randomUUID().toString, messagesPerOperation, _)), timeout)
def pointToPoint(): Unit = {
Await.result(
system.ask(
Guardian.RunPointToPoint(s"point-to-point-${UUID.randomUUID()}", messagesPerOperation, useAsk = false, _)),
timeout)
}
@Benchmark
@OperationsPerInvocation(messagesPerOperation)
def pointToPointAsk(): Unit = {
Await.result(
system.ask(
Guardian.RunPointToPoint(s"point-to-point-${UUID.randomUUID()}", messagesPerOperation, useAsk = true, _)),
timeout)
}
@Benchmark
@OperationsPerInvocation(messagesPerOperation)
def workPulling1(): Unit = {
Await.result(
system.ask(Guardian.RunWorkPulling(s"work-pulling-${UUID.randomUUID()}", messagesPerOperation, workers = 1, _)),
timeout)
}
@Benchmark
@OperationsPerInvocation(messagesPerOperation)
def workPulling2(): Unit = {
Await.result(
system.ask(Guardian.RunWorkPulling(s"work-pulling-${UUID.randomUUID()}", messagesPerOperation, workers = 2, _)),
timeout)
}
}