Merge pull request #28878 from akka/wip-delivery-log-perf-patriknw
avoid touching MDC for context.log.trace in reliable delivery
This commit is contained in:
commit
6361bdb153
6 changed files with 58 additions and 28 deletions
|
|
@ -228,6 +228,8 @@ private class ConsumerControllerImpl[A](
|
|||
|
||||
private val flightRecorder = ActorFlightRecorder(context.system).delivery
|
||||
|
||||
private val traceEnabled = context.log.isTraceEnabled
|
||||
|
||||
startRetryTimer()
|
||||
|
||||
private def resendLost = !settings.onlyFlowControl
|
||||
|
|
@ -245,7 +247,7 @@ private class ConsumerControllerImpl[A](
|
|||
flightRecorder.consumerReceived(pid, seqNr)
|
||||
|
||||
if (s.isProducerChanged(seqMsg)) {
|
||||
if (seqMsg.first)
|
||||
if (seqMsg.first && traceEnabled)
|
||||
context.log.trace("Received first SequencedMessage seqNr [{}], delivering to consumer.", seqNr)
|
||||
receiveChangedProducer(s, seqMsg)
|
||||
} else if (s.registering.isDefined) {
|
||||
|
|
@ -254,7 +256,8 @@ private class ConsumerControllerImpl[A](
|
|||
seqNr)
|
||||
stashBuffer.unstash(Behaviors.same, 1, scalaIdentityFunction)
|
||||
} else if (s.isNextExpected(seqMsg)) {
|
||||
context.log.trace("Received SequencedMessage seqNr [{}], delivering to consumer.", seqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Received SequencedMessage seqNr [{}], delivering to consumer.", seqNr)
|
||||
deliver(s.copy(receivedSeqNr = seqNr), seqMsg)
|
||||
} else if (seqNr > expectedSeqNr) {
|
||||
flightRecorder.consumerMissing(pid, expectedSeqNr, seqNr)
|
||||
|
|
@ -374,7 +377,7 @@ private class ConsumerControllerImpl[A](
|
|||
val seqNr = seqMsg.seqNr
|
||||
|
||||
if (s.isProducerChanged(seqMsg)) {
|
||||
if (seqMsg.first)
|
||||
if (seqMsg.first && traceEnabled)
|
||||
context.log.trace("Received first SequencedMessage seqNr [{}], delivering to consumer.", seqNr)
|
||||
receiveChangedProducer(s, seqMsg)
|
||||
} else if (s.registering.isDefined) {
|
||||
|
|
@ -441,7 +444,11 @@ private class ConsumerControllerImpl[A](
|
|||
.receiveMessage[InternalCommand] {
|
||||
case Confirmed =>
|
||||
val seqNr = seqMsg.seqNr
|
||||
context.log.trace("Received Confirmed seqNr [{}] from consumer, stashed size [{}].", seqNr, stashBuffer.size)
|
||||
if (traceEnabled)
|
||||
context.log.trace(
|
||||
"Received Confirmed seqNr [{}] from consumer, stashed size [{}].",
|
||||
seqNr,
|
||||
stashBuffer.size)
|
||||
|
||||
val newRequestedSeqNr =
|
||||
if (seqMsg.first) {
|
||||
|
|
@ -466,7 +473,8 @@ private class ConsumerControllerImpl[A](
|
|||
newRequestedSeqNr
|
||||
} else {
|
||||
if (seqMsg.ack) {
|
||||
context.log.trace("Sending Ack seqNr [{}].", seqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Sending Ack seqNr [{}].", seqNr)
|
||||
s.producerController ! Ack(confirmedSeqNr = seqNr)
|
||||
}
|
||||
s.requestedSeqNr
|
||||
|
|
@ -499,7 +507,7 @@ private class ConsumerControllerImpl[A](
|
|||
"Received SequencedMessage seqNr [{}], discarding message because stash is full.",
|
||||
msg.seqNr)
|
||||
} else {
|
||||
if (context.log.isTraceEnabled())
|
||||
if (traceEnabled)
|
||||
context.log.traceN(
|
||||
"Received SequencedMessage seqNr [{}], stashing while waiting for consumer to confirm [{}], stashed size [{}].",
|
||||
msg.seqNr,
|
||||
|
|
|
|||
|
|
@ -352,6 +352,7 @@ private class ProducerControllerImpl[A: ClassTag](
|
|||
import ProducerControllerImpl._
|
||||
|
||||
private val flightRecorder = ActorFlightRecorder(context.system).delivery
|
||||
private val traceEnabled = context.log.isTraceEnabled
|
||||
// for the durableQueue StoreMessageSent ask
|
||||
private implicit val askTimeout: Timeout = settings.durableQueueRequestTimeout
|
||||
|
||||
|
|
@ -359,7 +360,7 @@ private class ProducerControllerImpl[A: ClassTag](
|
|||
|
||||
def onMsg(m: A, newReplyAfterStore: Map[SeqNr, ActorRef[SeqNr]], ack: Boolean): Behavior[InternalCommand] = {
|
||||
checkOnMsgRequestedState()
|
||||
if (context.log.isTraceEnabled)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Sending [{}] with seqNr [{}].", m.getClass.getName, s.currentSeqNr)
|
||||
val seqMsg = SequencedMessage(producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr, ack)(context.self)
|
||||
val newUnconfirmed =
|
||||
|
|
@ -449,7 +450,8 @@ private class ProducerControllerImpl[A: ClassTag](
|
|||
}
|
||||
|
||||
def receiveAck(newConfirmedSeqNr: SeqNr): Behavior[InternalCommand] = {
|
||||
context.log.trace2("Received Ack, confirmed [{}], current [{}].", newConfirmedSeqNr, s.currentSeqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace2("Received Ack, confirmed [{}], current [{}].", newConfirmedSeqNr, s.currentSeqNr)
|
||||
val stateAfterAck = onAck(newConfirmedSeqNr)
|
||||
if (newConfirmedSeqNr == s.firstSeqNr && stateAfterAck.unconfirmed.nonEmpty) {
|
||||
resendUnconfirmed(stateAfterAck.unconfirmed)
|
||||
|
|
@ -459,7 +461,7 @@ private class ProducerControllerImpl[A: ClassTag](
|
|||
|
||||
def onAck(newConfirmedSeqNr: SeqNr): State[A] = {
|
||||
val (replies, newReplyAfterStore) = s.replyAfterStore.partition { case (seqNr, _) => seqNr <= newConfirmedSeqNr }
|
||||
if (replies.nonEmpty)
|
||||
if (replies.nonEmpty && traceEnabled)
|
||||
context.log.trace("Sending confirmation replies from [{}] to [{}].", replies.head._1, replies.last._1)
|
||||
replies.foreach {
|
||||
case (seqNr, replyTo) => replyTo ! seqNr
|
||||
|
|
@ -489,7 +491,8 @@ private class ProducerControllerImpl[A: ClassTag](
|
|||
throw new IllegalStateException(s"currentSeqNr [${s.currentSeqNr}] not matching stored seqNr [$seqNr]")
|
||||
|
||||
s.replyAfterStore.get(seqNr).foreach { replyTo =>
|
||||
context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr)
|
||||
replyTo ! seqNr
|
||||
}
|
||||
val newReplyAfterStore = s.replyAfterStore - seqNr
|
||||
|
|
|
|||
|
|
@ -278,6 +278,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
import WorkPullingProducerController.WorkerStats
|
||||
import WorkPullingProducerControllerImpl._
|
||||
|
||||
private val traceEnabled = context.log.isTraceEnabled
|
||||
private val durableQueueAskTimeout: Timeout = settings.producerControllerSettings.durableQueueRequestTimeout
|
||||
private val workerAskTimeout: Timeout = settings.internalAskTimeout
|
||||
|
||||
|
|
@ -288,7 +289,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
|
||||
def onMessage(msg: A, wasStashed: Boolean, replyTo: Option[ActorRef[Done]], totalSeqNr: TotalSeqNr): State[A] = {
|
||||
val consumersWithDemand = s.out.iterator.filter { case (_, out) => out.askNextTo.isDefined }.toVector
|
||||
if (context.log.isTraceEnabled)
|
||||
if (traceEnabled)
|
||||
context.log.traceN(
|
||||
"Received message seqNr [{}], wasStashed [{}], consumersWithDemand [{}], hasRequested [{}].",
|
||||
totalSeqNr,
|
||||
|
|
@ -337,7 +338,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
}
|
||||
|
||||
def tellRequestNext(): Unit = {
|
||||
context.log.trace("Sending RequestNext to producer, seqNr [{}].", totalSeqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Sending RequestNext to producer, seqNr [{}].", totalSeqNr)
|
||||
s.producer ! requestNext
|
||||
}
|
||||
|
||||
|
|
@ -452,7 +454,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
|
||||
def receiveStoreMessageSentCompleted(seqNr: SeqNr, m: A) = {
|
||||
s.replyAfterStore.get(seqNr).foreach { replyTo =>
|
||||
context.log.trace("Sending reply for seqNr [{}] after storage.", seqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Sending reply for seqNr [{}] after storage.", seqNr)
|
||||
replyTo ! Done
|
||||
}
|
||||
|
||||
|
|
@ -484,7 +487,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
}
|
||||
|
||||
if (confirmed.nonEmpty) {
|
||||
context.log.trace("Received Ack seqNr [{}] from worker [{}].", confirmedSeqNr, outState.confirmationQualifier)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Received Ack seqNr [{}] from worker [{}].", confirmedSeqNr, outState.confirmationQualifier)
|
||||
confirmed.foreach {
|
||||
case Unconfirmed(_, _, _, None) => // no reply
|
||||
case Unconfirmed(_, _, _, Some(replyTo)) =>
|
||||
|
|
@ -509,10 +513,11 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
s.out.get(outKey) match {
|
||||
case Some(outState) =>
|
||||
val confirmedSeqNr = w.next.confirmedSeqNr
|
||||
context.log.trace2(
|
||||
"Received RequestNext from worker [{}], confirmedSeqNr [{}].",
|
||||
w.next.producerId,
|
||||
confirmedSeqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace2(
|
||||
"Received RequestNext from worker [{}], confirmedSeqNr [{}].",
|
||||
w.next.producerId,
|
||||
confirmedSeqNr)
|
||||
|
||||
val newUnconfirmed = onAck(outState, confirmedSeqNr)
|
||||
|
||||
|
|
@ -528,7 +533,9 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
} else if (s.requested) {
|
||||
active(s.copy(out = newOut))
|
||||
} else {
|
||||
context.log.trace("Sending RequestNext to producer after RequestNext from worker [{}].", w.next.producerId)
|
||||
if (traceEnabled)
|
||||
context.log
|
||||
.trace("Sending RequestNext to producer after RequestNext from worker [{}].", w.next.producerId)
|
||||
s.producer ! requestNext
|
||||
active(s.copy(out = newOut, requested = true))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -82,13 +82,15 @@ object Consumer {
|
|||
|
||||
def apply(consumerController: ActorRef[ConsumerController.Command[Command]]): Behavior[Command] = {
|
||||
Behaviors.setup { context =>
|
||||
val traceEnabled = context.log.isTraceEnabled
|
||||
val deliveryAdapter =
|
||||
context.messageAdapter[ConsumerController.Delivery[Command]](WrappedDelivery(_))
|
||||
consumerController ! ConsumerController.Start(deliveryAdapter)
|
||||
|
||||
Behaviors.receiveMessagePartial {
|
||||
case WrappedDelivery(d @ ConsumerController.Delivery(_, confirmTo)) =>
|
||||
context.log.trace("Processed {}", d.seqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Processed {}", d.seqNr)
|
||||
confirmTo ! ConsumerController.Confirmed
|
||||
Behaviors.same
|
||||
}
|
||||
|
|
|
|||
|
|
@ -281,6 +281,7 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
|||
|
||||
private val durableQueueAskTimeout: Timeout = settings.producerControllerSettings.durableQueueRequestTimeout
|
||||
private val entityAskTimeout: Timeout = settings.internalAskTimeout
|
||||
private val traceEnabled = context.log.isTraceEnabled
|
||||
|
||||
private val requestNextAdapter: ActorRef[ProducerController.RequestNext[A]] =
|
||||
context.messageAdapter(WrappedRequestNext.apply)
|
||||
|
|
@ -404,7 +405,8 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
|||
def receiveAck(ack: Ack): Behavior[InternalCommand] = {
|
||||
s.out.get(ack.outKey) match {
|
||||
case Some(outState) =>
|
||||
context.log.trace2("Received Ack, confirmed [{}], current [{}].", ack.confirmedSeqNr, s.currentSeqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace2("Received Ack, confirmed [{}], current [{}].", ack.confirmedSeqNr, s.currentSeqNr)
|
||||
val newUnconfirmed = onAck(outState, ack.confirmedSeqNr)
|
||||
val newUsedNanoTime =
|
||||
if (newUnconfirmed.size != outState.unconfirmed.size) System.nanoTime() else outState.usedNanoTime
|
||||
|
|
@ -426,7 +428,8 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
|||
throw new IllegalStateException(s"Received RequestNext but already has demand for [$outKey]")
|
||||
|
||||
val confirmedSeqNr = w.next.confirmedSeqNr
|
||||
context.log.trace("Received RequestNext from [{}], confirmed seqNr [{}]", out.entityId, confirmedSeqNr)
|
||||
if (traceEnabled)
|
||||
context.log.trace("Received RequestNext from [{}], confirmed seqNr [{}]", out.entityId, confirmedSeqNr)
|
||||
val newUnconfirmed = onAck(out, confirmedSeqNr)
|
||||
|
||||
if (out.buffered.nonEmpty) {
|
||||
|
|
@ -573,7 +576,7 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
|||
}
|
||||
|
||||
private def send(msg: A, outKey: OutKey, outSeqNr: OutSeqNr, nextTo: ProducerController.RequestNext[A]): Unit = {
|
||||
if (context.log.isTraceEnabled)
|
||||
if (traceEnabled)
|
||||
context.log.traceN("Sending [{}] to [{}] with outSeqNr [{}].", msg.getClass.getName, outKey, outSeqNr)
|
||||
implicit val askTimeout: Timeout = entityAskTimeout
|
||||
context.ask[ProducerController.MessageWithConfirmation[A], OutSeqNr](
|
||||
|
|
|
|||
|
|
@ -219,14 +219,17 @@ private class EventSourcedProducerQueue[A](
|
|||
cleanupUnusedAfter: FiniteDuration) {
|
||||
import DurableProducerQueue._
|
||||
|
||||
private val traceEnabled = context.log.isTraceEnabled
|
||||
|
||||
def onCommand(state: State[A], command: Command[A]): Effect[Event, State[A]] = {
|
||||
command match {
|
||||
case StoreMessageSent(sent, replyTo) =>
|
||||
if (sent.seqNr == state.currentSeqNr) {
|
||||
context.log.trace(
|
||||
"StoreMessageSent seqNr [{}], confirmationQualifier [{}]",
|
||||
sent.seqNr,
|
||||
sent.confirmationQualifier)
|
||||
if (traceEnabled)
|
||||
context.log.trace(
|
||||
"StoreMessageSent seqNr [{}], confirmationQualifier [{}]",
|
||||
sent.seqNr,
|
||||
sent.confirmationQualifier)
|
||||
Effect.persist(sent).thenReply(replyTo)(_ => StoreMessageSentAck(sent.seqNr))
|
||||
} else if (sent.seqNr == state.currentSeqNr - 1) {
|
||||
// already stored, could be a retry after timout
|
||||
|
|
@ -239,7 +242,11 @@ private class EventSourcedProducerQueue[A](
|
|||
}
|
||||
|
||||
case StoreMessageConfirmed(seqNr, confirmationQualifier, timestampMillis) =>
|
||||
context.log.trace("StoreMessageConfirmed seqNr [{}], confirmationQualifier [{}]", seqNr, confirmationQualifier)
|
||||
if (traceEnabled)
|
||||
context.log.trace(
|
||||
"StoreMessageConfirmed seqNr [{}], confirmationQualifier [{}]",
|
||||
seqNr,
|
||||
confirmationQualifier)
|
||||
val previousConfirmedSeqNr = state.confirmedSeqNr.get(confirmationQualifier) match {
|
||||
case Some((nr, _)) => nr
|
||||
case None => 0L
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue