avoid filling ConsumerController stash with resent messages

* When running the ReliableDeliveryBenchmark there a stash full
  followed by missing message was noticed in the flight recorder events.
* The reason was that when the initial demand Request confirming seqNr=1
  was received the ProducerController had already sent a bunch more messages,
  and those are redelivered by that demand Request. Filling up the stash in
  the ConsumerController more than the flow control window.
* It sorted itself out, but it's unecessary to stash such messages.
* This is detecting seqNr < expected as duplicates and doesn't stash them.
This commit is contained in:
Patrik Nordwall 2020-03-24 08:13:22 +01:00
parent b4677f58d8
commit 885c46c987
5 changed files with 22 additions and 20 deletions

View file

@ -85,7 +85,8 @@ final class DeliveryProducerReceived(val producerId: String, val currentSeqNr: L
@Enabled(true) @Enabled(true)
@StackTrace(false) @StackTrace(false)
@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received demand request") @Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received demand request")
final class DeliveryProducerReceivedRequest(val producerId: String, val requestedSeqNr: Long) extends Event final class DeliveryProducerReceivedRequest(val producerId: String, val requestedSeqNr: Long, confirmedSeqNr: Long)
extends Event
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
@ -129,14 +130,14 @@ final class DeliveryConsumerReceivedPreviousInProgress(val producerId: String, v
@Enabled(true) @Enabled(true)
@StackTrace(false) @StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received duplicate") @Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received duplicate")
final class DeliveryConsumerDuplicate(val pid: String, val expectedSeqNr: Long, val seqNr: Long) extends Event final class DeliveryConsumerDuplicate(val producerId: String, val expectedSeqNr: Long, val seqNr: Long) extends Event
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi
@Enabled(true) @Enabled(true)
@StackTrace(false) @StackTrace(false)
@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received missing") @Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received missing")
final class DeliveryConsumerMissing(val pid: String, val expectedSeqNr: Long, val seqNr: Long) extends Event final class DeliveryConsumerMissing(val producerId: String, val expectedSeqNr: Long, val seqNr: Long) extends Event
/** INTERNAL API */ /** INTERNAL API */
@InternalApi @InternalApi

View file

@ -41,8 +41,8 @@ private[akka] final class JFRActorFlightRecorder(val system: ActorSystem[_]) ext
new DeliveryProducerResentFirstUnconfirmed(producerId, seqNr).commit() new DeliveryProducerResentFirstUnconfirmed(producerId, seqNr).commit()
override def producerReceived(producerId: String, currentSeqNr: Long): Unit = override def producerReceived(producerId: String, currentSeqNr: Long): Unit =
new DeliveryProducerReceived(producerId, currentSeqNr).commit() new DeliveryProducerReceived(producerId, currentSeqNr).commit()
override def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit = override def producerReceivedRequest(producerId: String, requestedSeqNr: Long, confirmedSeqNr: Long): Unit =
new DeliveryProducerReceivedRequest(producerId, requestedSeqNr).commit() new DeliveryProducerReceivedRequest(producerId, requestedSeqNr, confirmedSeqNr).commit()
override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit = override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit =
new DeliveryProducerReceivedResend(producerId, fromSeqNr).commit() new DeliveryProducerReceivedResend(producerId, fromSeqNr).commit()
@ -54,10 +54,10 @@ private[akka] final class JFRActorFlightRecorder(val system: ActorSystem[_]) ext
new DeliveryConsumerReceived(producerId, seqNr).commit() new DeliveryConsumerReceived(producerId, seqNr).commit()
override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit = override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit =
new DeliveryConsumerReceivedPreviousInProgress(producerId, seqNr: Long, stashed).commit() new DeliveryConsumerReceivedPreviousInProgress(producerId, seqNr: Long, stashed).commit()
override def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = override def consumerDuplicate(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit =
new DeliveryConsumerDuplicate(pid, expectedSeqNr, seqNr).commit() new DeliveryConsumerDuplicate(producerId, expectedSeqNr, seqNr).commit()
override def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = override def consumerMissing(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit =
new DeliveryConsumerMissing(pid, expectedSeqNr, seqNr).commit() new DeliveryConsumerMissing(producerId, expectedSeqNr, seqNr).commit()
override def consumerReceivedResend(seqNr: Long): Unit = override def consumerReceivedResend(seqNr: Long): Unit =
new DeliveryConsumerReceivedResend(seqNr).commit() new DeliveryConsumerReceivedResend(seqNr).commit()
override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit = override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit =

View file

@ -486,14 +486,15 @@ private class ConsumerControllerImpl[A](
} }
case msg: SequencedMessage[A] => case msg: SequencedMessage[A] =>
flightRecorder.consumerReceivedPreviousInProgress(seqMsg.producerId, seqMsg.seqNr, stashBuffer.size + 1) flightRecorder.consumerReceivedPreviousInProgress(msg.producerId, msg.seqNr, stashBuffer.size + 1)
if (msg.seqNr == seqMsg.seqNr && msg.producerController == seqMsg.producerController) { val expectedSeqNr = seqMsg.seqNr + stashBuffer.size + 1
flightRecorder.consumerDuplicate(msg.producerId, seqMsg.seqNr + 1, msg.seqNr) if (msg.seqNr < expectedSeqNr && msg.producerController == seqMsg.producerController) {
flightRecorder.consumerDuplicate(msg.producerId, expectedSeqNr, msg.seqNr)
context.log.debug("Received duplicate SequencedMessage seqNr [{}].", msg.seqNr) context.log.debug("Received duplicate SequencedMessage seqNr [{}].", msg.seqNr)
} else if (stashBuffer.isFull) { } else if (stashBuffer.isFull) {
// possible that the stash is full if ProducerController resends unconfirmed (duplicates) // possible that the stash is full if ProducerController resends unconfirmed (duplicates)
// dropping them since they can be resent // dropping them since they can be resent
flightRecorder.consumerStashFull(seqMsg.producerId, seqMsg.seqNr) flightRecorder.consumerStashFull(msg.producerId, msg.seqNr)
context.log.debug( context.log.debug(
"Received SequencedMessage seqNr [{}], discarding message because stash is full.", "Received SequencedMessage seqNr [{}], discarding message because stash is full.",
msg.seqNr) msg.seqNr)

View file

@ -401,7 +401,7 @@ private class ProducerControllerImpl[A: ClassTag](
newRequestedSeqNr: SeqNr, newRequestedSeqNr: SeqNr,
supportResend: Boolean, supportResend: Boolean,
viaTimeout: Boolean): Behavior[InternalCommand] = { viaTimeout: Boolean): Behavior[InternalCommand] = {
flightRecorder.producerReceivedRequest(producerId, newRequestedSeqNr) flightRecorder.producerReceivedRequest(producerId, newRequestedSeqNr, newConfirmedSeqNr)
context.log.debugN( context.log.debugN(
"Received Request, confirmed [{}], requested [{}], current [{}]", "Received Request, confirmed [{}], requested [{}], current [{}]",
newConfirmedSeqNr, newConfirmedSeqNr,

View file

@ -60,15 +60,15 @@ private[akka] trait ActorFlightRecorder extends Extension {
def producerResentFirst(producerId: String, firstSeqNr: Long): Unit def producerResentFirst(producerId: String, firstSeqNr: Long): Unit
def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit
def producerReceived(producerId: String, currentSeqNr: Long): Unit def producerReceived(producerId: String, currentSeqNr: Long): Unit
def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit def producerReceivedRequest(producerId: String, requestedSeqNr: Long, confirmedSeqNr: Long): Unit
def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit
def consumerCreated(path: ActorPath): Unit def consumerCreated(path: ActorPath): Unit
def consumerStarted(path: ActorPath): Unit def consumerStarted(path: ActorPath): Unit
def consumerReceived(producerId: String, seqNr: Long): Unit def consumerReceived(producerId: String, seqNr: Long): Unit
def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit
def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit def consumerDuplicate(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit
def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit def consumerMissing(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit
def consumerReceivedResend(seqNr: Long): Unit def consumerReceivedResend(seqNr: Long): Unit
def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit
def consumerChangedProducer(producerId: String): Unit def consumerChangedProducer(producerId: String): Unit
@ -100,15 +100,15 @@ private[akka] case object NoOpActorFlightRecorder extends ActorFlightRecorder {
override def producerResentFirst(producerId: String, firstSeqNr: Long): Unit = () override def producerResentFirst(producerId: String, firstSeqNr: Long): Unit = ()
override def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit = () override def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit = ()
override def producerReceived(producerId: String, currentSeqNr: Long): Unit = () override def producerReceived(producerId: String, currentSeqNr: Long): Unit = ()
override def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit = () override def producerReceivedRequest(producerId: String, requestedSeqNr: Long, confirmedSeqNr: Long): Unit = ()
override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit = () override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit = ()
override def consumerCreated(path: ActorPath): Unit = () override def consumerCreated(path: ActorPath): Unit = ()
override def consumerStarted(path: ActorPath): Unit = () override def consumerStarted(path: ActorPath): Unit = ()
override def consumerReceived(producerId: String, seqNr: Long): Unit = () override def consumerReceived(producerId: String, seqNr: Long): Unit = ()
override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit = () override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit = ()
override def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = () override def consumerDuplicate(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit = ()
override def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = () override def consumerMissing(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit = ()
override def consumerReceivedResend(seqNr: Long): Unit = () override def consumerReceivedResend(seqNr: Long): Unit = ()
override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit = () override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit = ()
override def consumerChangedProducer(producerId: String): Unit = () override def consumerChangedProducer(producerId: String): Unit = ()