Merge pull request #29873 from akka/wip-29854-RequestNext-bug-patriknw

Fix duplicate RequestNext messages in durable WorkPullingProducerController, #29854
This commit is contained in:
Patrik Nordwall 2021-01-13 17:20:32 +01:00 committed by GitHub
commit feec7aa9b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 12 deletions

View file

@ -191,7 +191,7 @@ import akka.util.Timeout
case Some(p) =>
becomeActive(p, load.state)
case None =>
// waiting for LoadStateReply
// waiting for Start
waitingForStart(producerId, context, stashBuffer, durableQueue, settings, producer, Some(load.state))
}
@ -365,7 +365,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
// wait until more demand
false
} else if (s.requested && wasStashed) {
// msg was unstashed, but pending request alread in progress
// msg was unstashed, but pending request already in progress
true
} else if (durableQueue.isDefined && !s.requested && !wasStashed) {
// msg ResendDurableMsg, and stashed before storage
@ -460,14 +460,18 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
replyTo ! Done
}
s.handOver.get(seqNr).foreach {
case HandOver(oldConfirmationQualifier, oldSeqNr) =>
val wasHandOver =
s.handOver.get(seqNr) match {
case Some(HandOver(oldConfirmationQualifier, oldSeqNr)) =>
durableQueue.foreach { d =>
d ! StoreMessageConfirmed(oldSeqNr, oldConfirmationQualifier, System.currentTimeMillis())
}
true
case None =>
false
}
val newState = onMessage(m, wasStashed = false, replyTo = None, seqNr)
val newState = onMessage(m, wasStashed = wasHandOver, replyTo = None, seqNr)
active(newState.copy(replyAfterStore = newState.replyAfterStore - seqNr, handOver = newState.handOver - seqNr))
}

View file

@ -4,6 +4,7 @@
package akka.persistence.typed.delivery
import scala.concurrent.duration._
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
@ -72,6 +73,8 @@ class WorkPullingWithEventSourcedProducerQueueSpec
testKit.stop(consumerController)
consumerProbe.expectTerminated(consumerController)
system.log.info("------------------ Start 2")
val producerController2 = spawn(
WorkPullingProducerController[String](
producerId,
@ -82,6 +85,13 @@ class WorkPullingWithEventSourcedProducerQueueSpec
val consumerController2 = spawn(ConsumerController[String](serviceKey))
consumerController2 ! ConsumerController.Start(consumerProbe.ref)
// start two consumers (same consumerProbe) to reproduce issue #29854
val consumerController3 = spawn(ConsumerController[String](serviceKey))
consumerController3 ! ConsumerController.Start(consumerProbe.ref)
val requestNext4 = producerProbe.receiveMessage()
producerProbe.expectNoMessage()
val delivery1 = consumerProbe.receiveMessage()
delivery1.message should ===("a")
delivery1.confirmTo ! ConsumerController.Confirmed
@ -94,7 +104,7 @@ class WorkPullingWithEventSourcedProducerQueueSpec
delivery3.message should ===("c")
delivery3.confirmTo ! ConsumerController.Confirmed
val requestNext4 = producerProbe.receiveMessage()
producerProbe.expectNoMessage()
requestNext4.sendNextTo ! "d"
val delivery4 = consumerProbe.receiveMessage()
@ -153,6 +163,7 @@ class WorkPullingWithEventSourcedProducerQueueSpec
delivery3.confirmTo ! ConsumerController.Confirmed
val requestNext4 = producerProbe.receiveMessage()
producerProbe.expectNoMessage()
requestNext4.sendNextTo ! "d"
// TODO Should we try harder to deduplicate first?
@ -208,7 +219,10 @@ class WorkPullingWithEventSourcedProducerQueueSpec
val batch1 = 15
val confirmed1 = 10
(1 to batch1).foreach { n =>
producerProbe.receiveMessage().sendNextTo ! s"msg-$n"
val reqNext = producerProbe.receiveMessage()
if (n == 1 || n == 7 || n == 13) // not checking all because takes too much time
producerProbe.expectNoMessage(50.millis) // issue #29854
reqNext.sendNextTo ! s"msg-$n"
}
(1 to confirmed1).foreach { _ =>
@ -229,6 +243,10 @@ class WorkPullingWithEventSourcedProducerQueueSpec
val consumerController4 = spawn(ConsumerController[String](serviceKey))
consumerController4 ! ConsumerController.Start(consumerProbe.ref)
// start two consumers (same consumerProbe) to reproduce issue #29854
val consumerController5 = spawn(ConsumerController[String](serviceKey))
consumerController5 ! ConsumerController.Start(consumerProbe.ref)
val producerController2 = spawn(
WorkPullingProducerController[String](
producerId,
@ -238,7 +256,10 @@ class WorkPullingWithEventSourcedProducerQueueSpec
val batch2 = 5
(batch1 + 1 to batch1 + batch2).foreach { n =>
producerProbe.receiveMessage().sendNextTo ! s"msg-$n"
val reqNext = producerProbe.receiveMessage()
if (n == batch1 + 1 || n == batch1 + 3) // not checking all because takes too much time
producerProbe.expectNoMessage(50.millis) // issue #29854
reqNext.sendNextTo ! s"msg-$n"
}
consumerProbe.fishForMessage(consumerProbe.remainingOrDefault) { delivery =>