JMH bench and flight recorder for reliable delivery, #28725 (#28741)

* JMH bench for reliable delivery
* JFR flight recorder for reliable delivery
* disable hi-freq events by default
* CompileJdk9
* fix validateCompile
This commit is contained in:
Patrik Nordwall 2020-03-24 07:59:33 +01:00 committed by GitHub
parent 7f2773024f
commit b1346ad7a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 563 additions and 18 deletions

View file

@ -0,0 +1,161 @@
/*
* Copyright (C) extends Event 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.jfr
import akka.annotation.InternalApi
import jdk.jfr.Category
import jdk.jfr.Enabled
import jdk.jfr.Event
import jdk.jfr.Label
import jdk.jfr.StackTrace
// requires jdk9+ to compile
// for editing these in IntelliJ, open module settings, change JDK dependency to 11 for only this module
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController created")
final class DeliveryProducerCreated(val producerId: String, val actorPath: String) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController started")
final class DeliveryProducerStarted(val producerId: String, val actorPath: String) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController sent RequestNext")
final class DeliveryProducerRequestNext(val producerId: String, val currentSeqNr: Long, val confirmedSeqNr: Long)
extends Event
/** INTERNAL API */
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController sent SequencedMessage")
final class DeliveryProducerSent(val producerId: String, val seqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController waiting for demand")
final class DeliveryProducerWaitingForRequest(val producerId: String, val currentSeqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController resent unconfirmed")
final class DeliveryProducerResentUnconfirmed(val producerId: String, val fromSeqNr: Long, val toSeqNr: Long)
extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController resent first")
final class DeliveryProducerResentFirst(val producerId: String, val firstSeqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label(
"Delivery ProducerController resent first unconfirmed")
final class DeliveryProducerResentFirstUnconfirmed(val producerId: String, val seqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received message")
final class DeliveryProducerReceived(val producerId: String, val currentSeqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received demand request")
final class DeliveryProducerReceivedRequest(val producerId: String, val requestedSeqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received resend request")
final class DeliveryProducerReceivedResend(val producerId: String, val fromSeqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController created")
final class DeliveryConsumerCreated(val actorPath: String) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController started")
final class DeliveryConsumerStarted(val actorPath: String) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received")
final class DeliveryConsumerReceived(val producerId: String, val seqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label(
"Delivery ConsumerController received, previous in progress")
final class DeliveryConsumerReceivedPreviousInProgress(val producerId: String, val seqNr: Long, val stashed: Int)
extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received duplicate")
final class DeliveryConsumerDuplicate(val pid: String, val expectedSeqNr: Long, val seqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received missing")
final class DeliveryConsumerMissing(val pid: String, val expectedSeqNr: Long, val seqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label(
"Delivery ConsumerController received expected resend")
final class DeliveryConsumerReceivedResend(val seqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController sent demand Request")
final class DeliveryConsumerSentRequest(val producerId: String, val requestedSeqNr: Long) extends Event
/** INTERNAL API */
@InternalApi
@Enabled(true)
@StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController producer changed")
final class DeliveryConsumerChangedProducer(val producerId: String) extends Event

View file

@ -0,0 +1,68 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.jfr
import akka.actor.ActorPath
import akka.actor.typed.ActorSystem
import akka.actor.typed.internal.ActorFlightRecorder
import akka.actor.typed.internal.DeliveryFlightRecorder
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class JFRActorFlightRecorder(val system: ActorSystem[_]) extends ActorFlightRecorder {
override val delivery: DeliveryFlightRecorder = new JFRDeliveryFlightRecorder
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class JFRDeliveryFlightRecorder extends DeliveryFlightRecorder {
override def producerCreated(producerId: String, path: ActorPath): Unit =
new DeliveryProducerCreated(producerId, path.toString).commit()
override def producerStarted(producerId: String, path: ActorPath): Unit =
new DeliveryProducerStarted(producerId, path.toString).commit()
override def producerRequestNext(producerId: String, currentSeqNr: Long, confirmedSeqNr: Long): Unit =
new DeliveryProducerRequestNext(producerId, currentSeqNr, confirmedSeqNr).commit()
override def producerSent(producerId: String, seqNr: Long): Unit =
new DeliveryProducerSent(producerId, seqNr).commit()
override def producerWaitingForRequest(producerId: String, currentSeqNr: Long): Unit =
new DeliveryProducerWaitingForRequest(producerId, currentSeqNr).commit()
override def producerResentUnconfirmed(producerId: String, fromSeqNr: Long, toSeqNr: Long): Unit =
new DeliveryProducerResentUnconfirmed(producerId, fromSeqNr, toSeqNr).commit()
override def producerResentFirst(producerId: String, firstSeqNr: Long): Unit =
new DeliveryProducerResentFirst(producerId, firstSeqNr).commit()
override def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit =
new DeliveryProducerResentFirstUnconfirmed(producerId, seqNr).commit()
override def producerReceived(producerId: String, currentSeqNr: Long): Unit =
new DeliveryProducerReceived(producerId, currentSeqNr).commit()
override def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit =
new DeliveryProducerReceivedRequest(producerId, requestedSeqNr).commit()
override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit =
new DeliveryProducerReceivedResend(producerId, fromSeqNr).commit()
override def consumerCreated(path: ActorPath): Unit =
new DeliveryConsumerCreated(path.toString).commit()
override def consumerStarted(path: ActorPath): Unit =
new DeliveryConsumerStarted(path.toString).commit()
override def consumerReceived(producerId: String, seqNr: Long): Unit =
new DeliveryConsumerReceived(producerId, seqNr).commit()
override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit =
new DeliveryConsumerReceivedPreviousInProgress(producerId, seqNr: Long, stashed).commit()
override def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit =
new DeliveryConsumerDuplicate(pid, expectedSeqNr, seqNr).commit()
override def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit =
new DeliveryConsumerMissing(pid, expectedSeqNr, seqNr).commit()
override def consumerReceivedResend(seqNr: Long): Unit =
new DeliveryConsumerReceivedResend(seqNr).commit()
override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit =
new DeliveryConsumerSentRequest(producerId, requestedSeqNr).commit()
override def consumerChangedProducer(producerId: String): Unit =
new DeliveryConsumerChangedProducer(producerId).commit()
}

View file

@ -10,6 +10,7 @@ import akka.actor.typed.PostStop
import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.ConsumerController.DeliverThenStop import akka.actor.typed.delivery.ConsumerController.DeliverThenStop
import akka.actor.typed.delivery.ProducerController import akka.actor.typed.delivery.ProducerController
import akka.actor.typed.internal.ActorFlightRecorder
import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
@ -77,6 +78,7 @@ import akka.annotation.InternalApi
private final case class State[A]( private final case class State[A](
producerController: ActorRef[ProducerControllerImpl.InternalCommand], producerController: ActorRef[ProducerControllerImpl.InternalCommand],
producerId: String,
consumer: ActorRef[ConsumerController.Delivery[A]], consumer: ActorRef[ConsumerController.Delivery[A]],
receivedSeqNr: SeqNr, receivedSeqNr: SeqNr,
confirmedSeqNr: SeqNr, confirmedSeqNr: SeqNr,
@ -104,6 +106,8 @@ import akka.annotation.InternalApi
Behaviors Behaviors
.withStash[InternalCommand](settings.flowControlWindow) { stashBuffer => .withStash[InternalCommand](settings.flowControlWindow) { stashBuffer =>
Behaviors.setup { context => Behaviors.setup { context =>
val flightRecorder = ActorFlightRecorder(context.system).delivery
flightRecorder.consumerCreated(context.self.path)
Behaviors.withMdc(msg => mdcForMessage(msg)) { Behaviors.withMdc(msg => mdcForMessage(msg)) {
context.setLoggerName("akka.actor.typed.delivery.ConsumerController") context.setLoggerName("akka.actor.typed.delivery.ConsumerController")
serviceKey.foreach { key => serviceKey.foreach { key =>
@ -122,6 +126,7 @@ import akka.annotation.InternalApi
ConsumerControllerImpl.enforceLocalConsumer(s.deliverTo) ConsumerControllerImpl.enforceLocalConsumer(s.deliverTo)
context.watchWith(s.deliverTo, ConsumerTerminated(s.deliverTo)) context.watchWith(s.deliverTo, ConsumerTerminated(s.deliverTo))
flightRecorder.consumerStarted(context.self.path)
val activeBehavior = val activeBehavior =
new ConsumerControllerImpl[A](context, timers, stashBuffer, settings) new ConsumerControllerImpl[A](context, timers, stashBuffer, settings)
.active(initialState(context, s, registering)) .active(initialState(context, s, registering))
@ -177,6 +182,7 @@ import akka.annotation.InternalApi
registering: Option[ActorRef[ProducerController.Command[A]]]): State[A] = { registering: Option[ActorRef[ProducerController.Command[A]]]): State[A] = {
State( State(
producerController = context.system.deadLetters, producerController = context.system.deadLetters,
"n/a",
start.deliverTo, start.deliverTo,
receivedSeqNr = 0, receivedSeqNr = 0,
confirmedSeqNr = 0, confirmedSeqNr = 0,
@ -208,6 +214,8 @@ private class ConsumerControllerImpl[A](
import ProducerControllerImpl.Resend import ProducerControllerImpl.Resend
import settings.flowControlWindow import settings.flowControlWindow
private val flightRecorder = ActorFlightRecorder(context.system).delivery
startRetryTimer() startRetryTimer()
private def resendLost = !settings.onlyFlowControl private def resendLost = !settings.onlyFlowControl
@ -222,6 +230,8 @@ private class ConsumerControllerImpl[A](
val seqNr = seqMsg.seqNr val seqNr = seqMsg.seqNr
val expectedSeqNr = s.receivedSeqNr + 1 val expectedSeqNr = s.receivedSeqNr + 1
flightRecorder.consumerReceived(pid, seqNr)
if (s.isProducerChanged(seqMsg)) { if (s.isProducerChanged(seqMsg)) {
if (seqMsg.first) if (seqMsg.first)
context.log.trace("Received first SequencedMessage seqNr [{}], delivering to consumer.", seqNr) context.log.trace("Received first SequencedMessage seqNr [{}], delivering to consumer.", seqNr)
@ -235,6 +245,7 @@ private class ConsumerControllerImpl[A](
context.log.trace("Received SequencedMessage seqNr [{}], delivering to consumer.", seqNr) context.log.trace("Received SequencedMessage seqNr [{}], delivering to consumer.", seqNr)
deliver(s.copy(receivedSeqNr = seqNr), seqMsg) deliver(s.copy(receivedSeqNr = seqNr), seqMsg)
} else if (seqNr > expectedSeqNr) { } else if (seqNr > expectedSeqNr) {
flightRecorder.consumerMissing(pid, expectedSeqNr, seqNr)
context.log.debugN( context.log.debugN(
"Received SequencedMessage seqNr [{}], but expected [{}], {}.", "Received SequencedMessage seqNr [{}], but expected [{}], {}.",
seqNr, seqNr,
@ -248,6 +259,7 @@ private class ConsumerControllerImpl[A](
waitingForConfirmation(s.copy(receivedSeqNr = seqNr), seqMsg) waitingForConfirmation(s.copy(receivedSeqNr = seqNr), seqMsg)
} }
} else { // seqNr < expectedSeqNr } else { // seqNr < expectedSeqNr
flightRecorder.consumerDuplicate(pid, expectedSeqNr, seqNr)
context.log.debug2("Received duplicate SequencedMessage seqNr [{}], expected [{}].", seqNr, expectedSeqNr) context.log.debug2("Received duplicate SequencedMessage seqNr [{}], expected [{}].", seqNr, expectedSeqNr)
if (seqMsg.first) if (seqMsg.first)
active(retryRequest(s)) active(retryRequest(s))
@ -294,6 +306,7 @@ private class ConsumerControllerImpl[A](
deliver( deliver(
s.copy( s.copy(
producerController = seqMsg.producerController, producerController = seqMsg.producerController,
producerId = seqMsg.producerId,
receivedSeqNr = seqNr, receivedSeqNr = seqNr,
confirmedSeqNr = 0L, confirmedSeqNr = 0L,
requestedSeqNr = newRequestedSeqNr, requestedSeqNr = newRequestedSeqNr,
@ -327,6 +340,7 @@ private class ConsumerControllerImpl[A](
seqMsg.producerController, seqMsg.producerController,
seqMsg.seqNr) seqMsg.seqNr)
} else { } else {
flightRecorder.consumerChangedProducer(seqMsg.producerId)
context.log.debugN( context.log.debugN(
"Changing ProducerController from [{}] to [{}], seqNr [{}].", "Changing ProducerController from [{}] to [{}], seqNr [{}].",
s.producerController, s.producerController,
@ -354,6 +368,7 @@ private class ConsumerControllerImpl[A](
seqNr) seqNr)
Behaviors.same Behaviors.same
} else if (s.isNextExpected(seqMsg)) { } else if (s.isNextExpected(seqMsg)) {
flightRecorder.consumerReceivedResend(seqNr)
context.log.debug("Received missing SequencedMessage seqNr [{}].", seqNr) context.log.debug("Received missing SequencedMessage seqNr [{}].", seqNr)
deliver(s.copy(receivedSeqNr = seqNr), seqMsg) deliver(s.copy(receivedSeqNr = seqNr), seqMsg)
} else { } else {
@ -417,6 +432,7 @@ private class ConsumerControllerImpl[A](
if (seqMsg.first) { if (seqMsg.first) {
// confirm the first message immediately to cancel resending of first // confirm the first message immediately to cancel resending of first
val newRequestedSeqNr = seqNr - 1 + flowControlWindow val newRequestedSeqNr = seqNr - 1 + flowControlWindow
flightRecorder.consumerSentRequest(seqMsg.producerId, newRequestedSeqNr)
context.log.debug( context.log.debug(
"Sending Request after first with confirmedSeqNr [{}], requestUpToSeqNr [{}].", "Sending Request after first with confirmedSeqNr [{}], requestUpToSeqNr [{}].",
seqNr, seqNr,
@ -425,6 +441,7 @@ private class ConsumerControllerImpl[A](
newRequestedSeqNr newRequestedSeqNr
} else if ((s.requestedSeqNr - seqNr) == flowControlWindow / 2) { } else if ((s.requestedSeqNr - seqNr) == flowControlWindow / 2) {
val newRequestedSeqNr = s.requestedSeqNr + flowControlWindow / 2 val newRequestedSeqNr = s.requestedSeqNr + flowControlWindow / 2
flightRecorder.consumerSentRequest(seqMsg.producerId, newRequestedSeqNr)
context.log.debug( context.log.debug(
"Sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].", "Sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].",
seqNr, seqNr,
@ -452,7 +469,9 @@ private class ConsumerControllerImpl[A](
} }
case msg: SequencedMessage[A] => case msg: SequencedMessage[A] =>
flightRecorder.consumerReceivedPreviousInProgress(seqMsg.producerId, seqMsg.seqNr, stashBuffer.size + 1)
if (msg.seqNr == seqMsg.seqNr && msg.producerController == seqMsg.producerController) { if (msg.seqNr == seqMsg.seqNr && msg.producerController == seqMsg.producerController) {
flightRecorder.consumerDuplicate(msg.producerId, seqMsg.seqNr + 1, msg.seqNr)
context.log.debug("Received duplicate SequencedMessage seqNr [{}].", msg.seqNr) context.log.debug("Received duplicate SequencedMessage seqNr [{}].", msg.seqNr)
} else if (stashBuffer.isFull) { } else if (stashBuffer.isFull) {
// possible that the stash is full if ProducerController resends unconfirmed (duplicates) // possible that the stash is full if ProducerController resends unconfirmed (duplicates)
@ -567,6 +586,7 @@ private class ConsumerControllerImpl[A](
// SequenceMessage are arriving. On the other hand it might be too much overhead to reschedule of each // SequenceMessage are arriving. On the other hand it might be too much overhead to reschedule of each
// incoming SequenceMessage. // incoming SequenceMessage.
val newRequestedSeqNr = if (resendLost) s.requestedSeqNr else s.receivedSeqNr + flowControlWindow / 2 val newRequestedSeqNr = if (resendLost) s.requestedSeqNr else s.receivedSeqNr + flowControlWindow / 2
flightRecorder.consumerSentRequest(s.producerId, newRequestedSeqNr)
context.log.debug( context.log.debug(
"Retry sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].", "Retry sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].",
s.confirmedSeqNr, s.confirmedSeqNr,

View file

@ -18,6 +18,7 @@ import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.ConsumerController.SequencedMessage import akka.actor.typed.delivery.ConsumerController.SequencedMessage
import akka.actor.typed.delivery.DurableProducerQueue import akka.actor.typed.delivery.DurableProducerQueue
import akka.actor.typed.delivery.ProducerController import akka.actor.typed.delivery.ProducerController
import akka.actor.typed.internal.ActorFlightRecorder
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.LoggerOps
@ -128,6 +129,7 @@ object ProducerControllerImpl {
settings: ProducerController.Settings): Behavior[Command[A]] = { settings: ProducerController.Settings): Behavior[Command[A]] = {
Behaviors Behaviors
.setup[InternalCommand] { context => .setup[InternalCommand] { context =>
ActorFlightRecorder(context.system).delivery.producerCreated(producerId, context.self.path)
Behaviors.withMdc(staticMdc = Map("producerId" -> producerId)) { Behaviors.withMdc(staticMdc = Map("producerId" -> producerId)) {
context.setLoggerName("akka.actor.typed.delivery.ProducerController") context.setLoggerName("akka.actor.typed.delivery.ProducerController")
val durableQueue = askLoadState(context, durableQueueBehavior, settings) val durableQueue = askLoadState(context, durableQueueBehavior, settings)
@ -161,6 +163,7 @@ object ProducerControllerImpl {
send: ConsumerController.SequencedMessage[A] => Unit): Behavior[Command[A]] = { send: ConsumerController.SequencedMessage[A] => Unit): Behavior[Command[A]] = {
Behaviors Behaviors
.setup[InternalCommand] { context => .setup[InternalCommand] { context =>
ActorFlightRecorder(context.system).delivery.producerCreated(producerId, context.self.path)
Behaviors.withMdc(staticMdc = Map("producerId" -> producerId)) { Behaviors.withMdc(staticMdc = Map("producerId" -> producerId)) {
context.setLoggerName("akka.actor.typed.delivery.ProducerController") context.setLoggerName("akka.actor.typed.delivery.ProducerController")
val durableQueue = askLoadState(context, durableQueueBehavior, settings) val durableQueue = askLoadState(context, durableQueueBehavior, settings)
@ -301,10 +304,13 @@ object ProducerControllerImpl {
state: State[A]): Behavior[InternalCommand] = { state: State[A]): Behavior[InternalCommand] = {
Behaviors.setup { context => Behaviors.setup { context =>
val flightRecorder = ActorFlightRecorder(context.system).delivery
flightRecorder.producerStarted(producerId, context.self.path)
Behaviors.withTimers { timers => Behaviors.withTimers { timers =>
val msgAdapter: ActorRef[A] = context.messageAdapter(msg => Msg(msg)) val msgAdapter: ActorRef[A] = context.messageAdapter(msg => Msg(msg))
val requested = val requested =
if (state.unconfirmed.isEmpty) { if (state.unconfirmed.isEmpty) {
flightRecorder.producerRequestNext(producerId, 1L, 0)
state.producer ! RequestNext(producerId, 1L, 0L, msgAdapter, context.self) state.producer ! RequestNext(producerId, 1L, 0L, msgAdapter, context.self)
true true
} else { } else {
@ -345,6 +351,7 @@ private class ProducerControllerImpl[A: ClassTag](
import ProducerController.Start import ProducerController.Start
import ProducerControllerImpl._ import ProducerControllerImpl._
private val flightRecorder = ActorFlightRecorder(context.system).delivery
// for the durableQueue StoreMessageSent ask // for the durableQueue StoreMessageSent ask
private implicit val askTimeout: Timeout = settings.durableQueueRequestTimeout private implicit val askTimeout: Timeout = settings.durableQueueRequestTimeout
@ -362,11 +369,14 @@ private class ProducerControllerImpl[A: ClassTag](
if (s.currentSeqNr == s.firstSeqNr) if (s.currentSeqNr == s.firstSeqNr)
timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second) timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second)
flightRecorder.producerSent(producerId, seqMsg.seqNr)
s.send(seqMsg) s.send(seqMsg)
val newRequested = val newRequested =
if (s.currentSeqNr == s.requestedSeqNr) if (s.currentSeqNr == s.requestedSeqNr) {
flightRecorder.producerWaitingForRequest(producerId, s.currentSeqNr)
false false
else { } else {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr)
s.producer ! RequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr, msgAdapter, context.self) s.producer ! RequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr, msgAdapter, context.self)
true true
} }
@ -391,6 +401,7 @@ private class ProducerControllerImpl[A: ClassTag](
newRequestedSeqNr: SeqNr, newRequestedSeqNr: SeqNr,
supportResend: Boolean, supportResend: Boolean,
viaTimeout: Boolean): Behavior[InternalCommand] = { viaTimeout: Boolean): Behavior[InternalCommand] = {
flightRecorder.producerReceivedRequest(producerId, newRequestedSeqNr)
context.log.debugN( context.log.debugN(
"Received Request, confirmed [{}], requested [{}], current [{}]", "Received Request, confirmed [{}], requested [{}], current [{}]",
newConfirmedSeqNr, newConfirmedSeqNr,
@ -422,8 +433,10 @@ private class ProducerControllerImpl[A: ClassTag](
stateAfterAck.currentSeqNr) stateAfterAck.currentSeqNr)
if (newRequestedSeqNr2 > s.requestedSeqNr) { if (newRequestedSeqNr2 > s.requestedSeqNr) {
if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr)
s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self) s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self)
}
active( active(
stateAfterAck.copy( stateAfterAck.copy(
requested = true, requested = true,
@ -485,6 +498,7 @@ private class ProducerControllerImpl[A: ClassTag](
} }
def receiveResend(fromSeqNr: SeqNr): Behavior[InternalCommand] = { def receiveResend(fromSeqNr: SeqNr): Behavior[InternalCommand] = {
flightRecorder.producerReceivedResend(producerId, fromSeqNr)
val newUnconfirmed = val newUnconfirmed =
if (fromSeqNr == 0 && s.unconfirmed.nonEmpty) if (fromSeqNr == 0 && s.unconfirmed.nonEmpty)
s.unconfirmed.head.asFirst +: s.unconfirmed.tail s.unconfirmed.head.asFirst +: s.unconfirmed.tail
@ -495,13 +509,18 @@ private class ProducerControllerImpl[A: ClassTag](
} }
def resendUnconfirmed(newUnconfirmed: Vector[SequencedMessage[A]]): Unit = { def resendUnconfirmed(newUnconfirmed: Vector[SequencedMessage[A]]): Unit = {
if (newUnconfirmed.nonEmpty) if (newUnconfirmed.nonEmpty) {
context.log.debug("Resending [{} - {}].", newUnconfirmed.head.seqNr, newUnconfirmed.last.seqNr) val fromSeqNr = newUnconfirmed.head.seqNr
newUnconfirmed.foreach(s.send) val toSeqNr = newUnconfirmed.last.seqNr
flightRecorder.producerResentUnconfirmed(producerId, fromSeqNr, toSeqNr)
context.log.debug("Resending [{} - {}].", fromSeqNr, toSeqNr)
newUnconfirmed.foreach(s.send)
}
} }
def receiveResendFirstUnconfirmed(): Behavior[InternalCommand] = { def receiveResendFirstUnconfirmed(): Behavior[InternalCommand] = {
if (s.unconfirmed.nonEmpty) { if (s.unconfirmed.nonEmpty) {
flightRecorder.producerResentFirstUnconfirmed(producerId, s.unconfirmed.head.seqNr)
context.log.debug("Resending first unconfirmed [{}].", s.unconfirmed.head.seqNr) context.log.debug("Resending first unconfirmed [{}].", s.unconfirmed.head.seqNr)
s.send(s.unconfirmed.head) s.send(s.unconfirmed.head)
} }
@ -510,6 +529,7 @@ private class ProducerControllerImpl[A: ClassTag](
def receiveResendFirst(): Behavior[InternalCommand] = { def receiveResendFirst(): Behavior[InternalCommand] = {
if (s.unconfirmed.nonEmpty && s.unconfirmed.head.seqNr == s.firstSeqNr) { if (s.unconfirmed.nonEmpty && s.unconfirmed.head.seqNr == s.firstSeqNr) {
flightRecorder.producerResentFirst(producerId, s.firstSeqNr)
context.log.debug("Resending first, [{}].", s.firstSeqNr) context.log.debug("Resending first, [{}].", s.firstSeqNr)
s.send(s.unconfirmed.head.asFirst) s.send(s.unconfirmed.head.asFirst)
} else { } else {
@ -522,8 +542,10 @@ private class ProducerControllerImpl[A: ClassTag](
def receiveStart(start: Start[A]): Behavior[InternalCommand] = { def receiveStart(start: Start[A]): Behavior[InternalCommand] = {
ProducerControllerImpl.enforceLocalProducer(start.producer) ProducerControllerImpl.enforceLocalProducer(start.producer)
context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr) context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr)
if (s.requested) if (s.requested) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr)
start.producer ! RequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr, msgAdapter, context.self) start.producer ! RequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr, msgAdapter, context.self)
}
active(s.copy(producer = start.producer)) active(s.copy(producer = start.producer))
} }
@ -547,6 +569,7 @@ private class ProducerControllerImpl[A: ClassTag](
Behaviors.receiveMessage { Behaviors.receiveMessage {
case MessageWithConfirmation(m: A, replyTo) => case MessageWithConfirmation(m: A, replyTo) =>
flightRecorder.producerReceived(producerId, s.currentSeqNr)
val newReplyAfterStore = s.replyAfterStore.updated(s.currentSeqNr, replyTo) val newReplyAfterStore = s.replyAfterStore.updated(s.currentSeqNr, replyTo)
if (durableQueue.isEmpty) { if (durableQueue.isEmpty) {
onMsg(m, newReplyAfterStore, ack = true) onMsg(m, newReplyAfterStore, ack = true)
@ -558,6 +581,7 @@ private class ProducerControllerImpl[A: ClassTag](
} }
case Msg(m: A) => case Msg(m: A) =>
flightRecorder.producerReceived(producerId, s.currentSeqNr)
if (durableQueue.isEmpty) { if (durableQueue.isEmpty) {
onMsg(m, s.replyAfterStore, ack = false) onMsg(m, s.replyAfterStore, ack = false)
} else { } else {

View file

@ -0,0 +1,115 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorPath
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.annotation.InternalApi
import akka.util.JavaVersion
/**
* INTERNAL API
*/
@InternalApi
object ActorFlightRecorder extends ExtensionId[ActorFlightRecorder] {
override def createExtension(system: ActorSystem[_]): ActorFlightRecorder =
if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) {
// Dynamic instantiation to not trigger class load on earlier JDKs
import scala.language.existentials
system.dynamicAccess.createInstanceFor[ActorFlightRecorder](
"akka.actor.typed.internal.jfr.JFRActorFlightRecorder",
(classOf[ActorSystem[_]], system) :: Nil) match {
case Success(jfr) => jfr
case Failure(ex) =>
system.log.warn("Failed to load JFR Actor flight recorder, falling back to noop. Exception: {}", ex.toString)
NoOpActorFlightRecorder
} // fallback if not possible to dynamically load for some reason
} else
// JFR not available on Java 8
NoOpActorFlightRecorder
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] trait ActorFlightRecorder extends Extension {
val delivery: DeliveryFlightRecorder
}
/**
* INTERNAL API
*/
@InternalApi private[akka] trait DeliveryFlightRecorder {
def producerCreated(producerId: String, path: ActorPath): Unit
def producerStarted(producerId: String, path: ActorPath): Unit
def producerRequestNext(producerId: String, currentSeqNr: Long, confirmedSeqNr: Long): Unit
def producerSent(producerId: String, seqNr: Long): Unit
def producerWaitingForRequest(producerId: String, currentSeqNr: Long): Unit
def producerResentUnconfirmed(producerId: String, fromSeqNr: Long, toSeqNr: Long): Unit
def producerResentFirst(producerId: String, firstSeqNr: Long): Unit
def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit
def producerReceived(producerId: String, currentSeqNr: Long): Unit
def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit
def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit
def consumerCreated(path: ActorPath): Unit
def consumerStarted(path: ActorPath): Unit
def consumerReceived(producerId: String, seqNr: Long): Unit
def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit
def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit
def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit
def consumerReceivedResend(seqNr: Long): Unit
def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit
def consumerChangedProducer(producerId: String): Unit
}
/**
* JFR is only available under certain circumstances (JDK11 for now, possible OpenJDK 8 in the future) so therefore
* the default on JDK 8 needs to be a no-op flight recorder.
*
* INTERNAL
*/
@InternalApi
private[akka] case object NoOpActorFlightRecorder extends ActorFlightRecorder {
override val delivery: DeliveryFlightRecorder = NoOpDeliveryFlightRecorder
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object NoOpDeliveryFlightRecorder extends DeliveryFlightRecorder {
override def producerCreated(producerId: String, path: ActorPath): Unit = ()
override def producerStarted(producerId: String, path: ActorPath): Unit = ()
override def producerRequestNext(producerId: String, currentSeqNr: Long, confirmedSeqNr: Long): Unit = ()
override def producerSent(producerId: String, seqNr: Long): Unit = ()
override def producerWaitingForRequest(producerId: String, currentSeqNr: Long): Unit = ()
override def producerResentUnconfirmed(producerId: String, fromSeqNr: Long, toSeqNr: Long): Unit = ()
override def producerResentFirst(producerId: String, firstSeqNr: Long): Unit = ()
override def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit = ()
override def producerReceived(producerId: String, currentSeqNr: Long): Unit = ()
override def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit = ()
override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit = ()
override def consumerCreated(path: ActorPath): Unit = ()
override def consumerStarted(path: ActorPath): Unit = ()
override def consumerReceived(producerId: String, seqNr: Long): Unit = ()
override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit = ()
override def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = ()
override def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = ()
override def consumerReceivedResend(seqNr: Long): Unit = ()
override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit = ()
override def consumerChangedProducer(producerId: String): Unit = ()
}

View file

@ -4,14 +4,11 @@
<statusListener class="ch.qos.logback.core.status.NopStatusListener" /> <statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<encoder> <encoder>
<pattern>%date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n</pattern> <pattern>%date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n</pattern>
</encoder> </encoder>
</appender> </appender>
<root level="DEBUG"> <root level="INFO">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
</root> </root>
</configuration> </configuration>

View file

@ -0,0 +1,157 @@
/*
* Copyright (C) 2014-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.delivery
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
object ReliableDeliveryBenchmark {
final val messagesPerOperation = 100000
final val timeout = 30.seconds
object Producer {
trait Command
case object Run extends Command
private case class WrappedRequestNext(r: ProducerController.RequestNext[Consumer.Command]) extends Command
def apply(
numberOfMessages: Int,
producerController: ActorRef[ProducerController.Command[Consumer.Command]]): Behavior[Command] = {
Behaviors.setup { context =>
val requestNextAdapter =
context.messageAdapter[ProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_))
Behaviors.receiveMessage {
case WrappedRequestNext(next) =>
if (next.confirmedSeqNr >= numberOfMessages) {
context.log.info("Completed {} messages", numberOfMessages)
Behaviors.stopped
} else {
next.sendNextTo ! Consumer.TheMessage
Behaviors.same
}
case Run =>
context.log.info("Starting {} messages", numberOfMessages)
producerController ! ProducerController.Start(requestNextAdapter)
Behaviors.same
}
}
}
}
object Consumer {
trait Command
case object TheMessage extends Command
private case class WrappedDelivery(d: ConsumerController.Delivery[Command]) extends Command
def apply(consumerController: ActorRef[ConsumerController.Command[Command]]): Behavior[Command] = {
Behaviors.setup { context =>
val deliveryAdapter =
context.messageAdapter[ConsumerController.Delivery[Command]](WrappedDelivery(_))
consumerController ! ConsumerController.Start(deliveryAdapter)
Behaviors.receiveMessagePartial {
case WrappedDelivery(d @ ConsumerController.Delivery(_, confirmTo)) =>
context.log.trace("Processed {}", d.seqNr)
confirmTo ! ConsumerController.Confirmed
Behaviors.same
}
}
}
}
object Guardian {
trait Command
final case class Run(id: String, numberOfMessages: Int, replyTo: ActorRef[Done]) extends Command
final case class ProducerTerminated(consumer: ActorRef[Consumer.Command], replyTo: ActorRef[Done]) extends Command
def apply(): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.receiveMessage {
case Run(id, numberOfMessages, replyTo) =>
val consumerController = context.spawn(ConsumerController[Consumer.Command](), s"consumerController-$id")
val consumer = context.spawn(Consumer(consumerController), s"consumer-$id")
val producerController = context.spawn(
ProducerController[Consumer.Command](id, durableQueueBehavior = None),
s"producerController-$id")
val producer = context.spawn(Producer(numberOfMessages, producerController), s"producer-$id")
context.watchWith(producer, ProducerTerminated(consumer, replyTo))
consumerController ! ConsumerController.RegisterToProducerController(producerController)
producer ! Producer.Run
Behaviors.same
case ProducerTerminated(consumer, replyTo) =>
context.stop(consumer)
replyTo ! Done
Behaviors.same
}
}
}
}
}
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(1)
@Threads(1)
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1)
@Measurement(iterations = 10, time = 10, timeUnit = TimeUnit.SECONDS, batchSize = 1)
class ReliableDeliveryBenchmark {
import ReliableDeliveryBenchmark._
@Param(Array("10", "50"))
var window = 0
implicit var system: ActorSystem[Guardian.Command] = _
implicit val askTimeout = akka.util.Timeout(timeout)
@Setup(Level.Trial)
def setup(): Unit = {
system = ActorSystem(
Guardian(),
"ReliableDeliveryBenchmark",
ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.reliable-delivery {
consumer-controller.flow-control-window = $window
}
"""))
}
@TearDown(Level.Trial)
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
@Benchmark
@OperationsPerInvocation(messagesPerOperation)
def echo(): Unit = {
Await.result(system.ask(Guardian.Run(UUID.randomUUID().toString, messagesPerOperation, _)), timeout)
}
}

View file

@ -271,7 +271,10 @@ lazy val persistenceTck = akkaModule("akka-persistence-tck")
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
lazy val persistenceTestkit = akkaModule("akka-persistence-testkit") lazy val persistenceTestkit = akkaModule("akka-persistence-testkit")
.dependsOn(persistenceTyped % "compile->compile;provided->provided;test->test", testkit % "compile->compile;test->test", persistenceTck % "test") .dependsOn(
persistenceTyped % "compile->compile;provided->provided;test->test",
testkit % "compile->compile;test->test",
persistenceTck % "test")
.settings(Dependencies.persistenceTestKit) .settings(Dependencies.persistenceTestKit)
.settings(AutomaticModuleName.settings("akka.persistence.testkit")) .settings(AutomaticModuleName.settings("akka.persistence.testkit"))
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
@ -325,7 +328,7 @@ lazy val remote =
lazy val remoteTests = akkaModule("akka-remote-tests") lazy val remoteTests = akkaModule("akka-remote-tests")
.dependsOn( .dependsOn(
actorTests % "test->test", actorTests % "test->test",
remote % "test->test", remote % "compile->CompileJdk9;test->test",
streamTestkit % "test", streamTestkit % "test",
multiNodeTestkit, multiNodeTestkit,
jackson % "test->test") jackson % "test->test")
@ -397,6 +400,7 @@ lazy val actorTyped = akkaModule("akka-actor-typed")
import akka.util.Timeout import akka.util.Timeout
implicit val timeout = Timeout(5.seconds) implicit val timeout = Timeout(5.seconds)
""") """)
.enablePlugins(Jdk9)
lazy val persistenceTyped = akkaModule("akka-persistence-typed") lazy val persistenceTyped = akkaModule("akka-persistence-typed")
.dependsOn( .dependsOn(
@ -434,11 +438,13 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed")
lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
.dependsOn( .dependsOn(
actorTyped % "compile->CompileJdk9",
clusterTyped % "compile->compile;test->test;multi-jvm->multi-jvm", clusterTyped % "compile->compile;test->test;multi-jvm->multi-jvm",
clusterSharding, clusterSharding,
actorTestkitTyped % "test->test", actorTestkitTyped % "test->test",
actorTypedTests % "test->test", actorTypedTests % "test->test",
persistenceTyped % "test->test", persistenceTyped % "test->test",
remote % "compile->CompileJdk9;test->test",
remoteTests % "test->test", remoteTests % "test->test",
jackson % "test->test") jackson % "test->test")
.settings(javacOptions += "-parameters") // for Jackson .settings(javacOptions += "-parameters") // for Jackson
@ -465,10 +471,7 @@ lazy val actorTestkitTyped = akkaModule("akka-actor-testkit-typed")
.settings(Dependencies.actorTestkitTyped) .settings(Dependencies.actorTestkitTyped)
lazy val actorTypedTests = akkaModule("akka-actor-typed-tests") lazy val actorTypedTests = akkaModule("akka-actor-typed-tests")
.dependsOn( .dependsOn(actorTyped % "compile->CompileJdk9", actorTestkitTyped % "compile->compile;test->test")
actorTyped,
actorTestkitTyped % "compile->compile;test->test"
)
.settings(AkkaBuild.mayChangeSettings) .settings(AkkaBuild.mayChangeSettings)
.disablePlugins(MimaPlugin) .disablePlugins(MimaPlugin)
.enablePlugins(NoPublish) .enablePlugins(NoPublish)

View file

@ -270,7 +270,7 @@ object Dependencies {
val docs = l ++= Seq(Test.scalatest, Test.junit, Docs.sprayJson, Docs.gson, Provided.levelDB) val docs = l ++= Seq(Test.scalatest, Test.junit, Docs.sprayJson, Docs.gson, Provided.levelDB)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Compile.jctools) val benchJmh = l ++= Seq(logback, Provided.levelDB, Provided.levelDBNative, Compile.jctools)
// akka stream // akka stream