diff --git a/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/Events.scala b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/Events.scala index a7f2c1ec8a..18e9020caa 100644 --- a/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/Events.scala +++ b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/Events.scala @@ -85,7 +85,8 @@ final class DeliveryProducerReceived(val producerId: String, val currentSeqNr: L @Enabled(true) @StackTrace(false) @Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received demand request") -final class DeliveryProducerReceivedRequest(val producerId: String, val requestedSeqNr: Long) extends Event +final class DeliveryProducerReceivedRequest(val producerId: String, val requestedSeqNr: Long, confirmedSeqNr: Long) + extends Event /** INTERNAL API */ @InternalApi @@ -129,14 +130,14 @@ final class DeliveryConsumerReceivedPreviousInProgress(val producerId: String, v @Enabled(true) @StackTrace(false) @Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received duplicate") -final class DeliveryConsumerDuplicate(val pid: String, val expectedSeqNr: Long, val seqNr: Long) extends Event +final class DeliveryConsumerDuplicate(val producerId: String, val expectedSeqNr: Long, val seqNr: Long) extends Event /** INTERNAL API */ @InternalApi @Enabled(true) @StackTrace(false) @Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received missing") -final class DeliveryConsumerMissing(val pid: String, val expectedSeqNr: Long, val seqNr: Long) extends Event +final class DeliveryConsumerMissing(val producerId: String, val expectedSeqNr: Long, val seqNr: Long) extends Event /** INTERNAL API */ @InternalApi diff --git a/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala index 6da526fb3e..8f90d0eea7 100644 --- a/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala +++ b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala @@ -41,8 +41,8 @@ private[akka] final class JFRActorFlightRecorder(val system: ActorSystem[_]) ext new DeliveryProducerResentFirstUnconfirmed(producerId, seqNr).commit() override def producerReceived(producerId: String, currentSeqNr: Long): Unit = new DeliveryProducerReceived(producerId, currentSeqNr).commit() - override def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit = - new DeliveryProducerReceivedRequest(producerId, requestedSeqNr).commit() + override def producerReceivedRequest(producerId: String, requestedSeqNr: Long, confirmedSeqNr: Long): Unit = + new DeliveryProducerReceivedRequest(producerId, requestedSeqNr, confirmedSeqNr).commit() override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit = new DeliveryProducerReceivedResend(producerId, fromSeqNr).commit() @@ -54,10 +54,10 @@ private[akka] final class JFRActorFlightRecorder(val system: ActorSystem[_]) ext new DeliveryConsumerReceived(producerId, seqNr).commit() override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit = new DeliveryConsumerReceivedPreviousInProgress(producerId, seqNr: Long, stashed).commit() - override def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = - new DeliveryConsumerDuplicate(pid, expectedSeqNr, seqNr).commit() - override def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = - new DeliveryConsumerMissing(pid, expectedSeqNr, seqNr).commit() + override def consumerDuplicate(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit = + new DeliveryConsumerDuplicate(producerId, expectedSeqNr, seqNr).commit() + override def consumerMissing(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit = + new DeliveryConsumerMissing(producerId, expectedSeqNr, seqNr).commit() override def consumerReceivedResend(seqNr: Long): Unit = new DeliveryConsumerReceivedResend(seqNr).commit() override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit = diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala index 82a9fcea5e..3041f68af7 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala @@ -486,14 +486,15 @@ private class ConsumerControllerImpl[A]( } case msg: SequencedMessage[A] => - flightRecorder.consumerReceivedPreviousInProgress(seqMsg.producerId, seqMsg.seqNr, stashBuffer.size + 1) - if (msg.seqNr == seqMsg.seqNr && msg.producerController == seqMsg.producerController) { - flightRecorder.consumerDuplicate(msg.producerId, seqMsg.seqNr + 1, msg.seqNr) + flightRecorder.consumerReceivedPreviousInProgress(msg.producerId, msg.seqNr, stashBuffer.size + 1) + val expectedSeqNr = seqMsg.seqNr + stashBuffer.size + 1 + if (msg.seqNr < expectedSeqNr && msg.producerController == seqMsg.producerController) { + flightRecorder.consumerDuplicate(msg.producerId, expectedSeqNr, msg.seqNr) context.log.debug("Received duplicate SequencedMessage seqNr [{}].", msg.seqNr) } else if (stashBuffer.isFull) { // possible that the stash is full if ProducerController resends unconfirmed (duplicates) // dropping them since they can be resent - flightRecorder.consumerStashFull(seqMsg.producerId, seqMsg.seqNr) + flightRecorder.consumerStashFull(msg.producerId, msg.seqNr) context.log.debug( "Received SequencedMessage seqNr [{}], discarding message because stash is full.", msg.seqNr) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala index 00e9ff6794..7c901ff1b5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala @@ -401,7 +401,7 @@ private class ProducerControllerImpl[A: ClassTag]( newRequestedSeqNr: SeqNr, supportResend: Boolean, viaTimeout: Boolean): Behavior[InternalCommand] = { - flightRecorder.producerReceivedRequest(producerId, newRequestedSeqNr) + flightRecorder.producerReceivedRequest(producerId, newRequestedSeqNr, newConfirmedSeqNr) context.log.debugN( "Received Request, confirmed [{}], requested [{}], current [{}]", newConfirmedSeqNr, diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala index b8ffefd820..faf8956b81 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala @@ -60,15 +60,15 @@ private[akka] trait ActorFlightRecorder extends Extension { def producerResentFirst(producerId: String, firstSeqNr: Long): Unit def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit def producerReceived(producerId: String, currentSeqNr: Long): Unit - def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit + def producerReceivedRequest(producerId: String, requestedSeqNr: Long, confirmedSeqNr: Long): Unit def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit def consumerCreated(path: ActorPath): Unit def consumerStarted(path: ActorPath): Unit def consumerReceived(producerId: String, seqNr: Long): Unit def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit - def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit - def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit + def consumerDuplicate(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit + def consumerMissing(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit def consumerReceivedResend(seqNr: Long): Unit def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit def consumerChangedProducer(producerId: String): Unit @@ -100,15 +100,15 @@ private[akka] case object NoOpActorFlightRecorder extends ActorFlightRecorder { override def producerResentFirst(producerId: String, firstSeqNr: Long): Unit = () override def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit = () override def producerReceived(producerId: String, currentSeqNr: Long): Unit = () - override def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit = () + override def producerReceivedRequest(producerId: String, requestedSeqNr: Long, confirmedSeqNr: Long): Unit = () override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit = () override def consumerCreated(path: ActorPath): Unit = () override def consumerStarted(path: ActorPath): Unit = () override def consumerReceived(producerId: String, seqNr: Long): Unit = () override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit = () - override def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = () - override def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = () + override def consumerDuplicate(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit = () + override def consumerMissing(producerId: String, expectedSeqNr: Long, seqNr: Long): Unit = () override def consumerReceivedResend(seqNr: Long): Unit = () override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit = () override def consumerChangedProducer(producerId: String): Unit = ()