diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala index 032b0cc991..a4a6a477a3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala @@ -191,7 +191,7 @@ import akka.util.Timeout case Some(p) => becomeActive(p, load.state) case None => - // waiting for LoadStateReply + // waiting for Start waitingForStart(producerId, context, stashBuffer, durableQueue, settings, producer, Some(load.state)) } @@ -365,7 +365,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( // wait until more demand false } else if (s.requested && wasStashed) { - // msg was unstashed, but pending request alread in progress + // msg was unstashed, but pending request already in progress true } else if (durableQueue.isDefined && !s.requested && !wasStashed) { // msg ResendDurableMsg, and stashed before storage @@ -460,14 +460,18 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( replyTo ! Done } - s.handOver.get(seqNr).foreach { - case HandOver(oldConfirmationQualifier, oldSeqNr) => - durableQueue.foreach { d => - d ! StoreMessageConfirmed(oldSeqNr, oldConfirmationQualifier, System.currentTimeMillis()) - } - } + val wasHandOver = + s.handOver.get(seqNr) match { + case Some(HandOver(oldConfirmationQualifier, oldSeqNr)) => + durableQueue.foreach { d => + d ! StoreMessageConfirmed(oldSeqNr, oldConfirmationQualifier, System.currentTimeMillis()) + } + true + case None => + false + } - val newState = onMessage(m, wasStashed = false, replyTo = None, seqNr) + val newState = onMessage(m, wasStashed = wasHandOver, replyTo = None, seqNr) active(newState.copy(replyAfterStore = newState.replyAfterStore - seqNr, handOver = newState.handOver - seqNr)) } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/delivery/WorkPullingWithEventSourcedProducerQueueSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/delivery/WorkPullingWithEventSourcedProducerQueueSpec.scala index 387ac5d140..ed6b058c21 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/delivery/WorkPullingWithEventSourcedProducerQueueSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/delivery/WorkPullingWithEventSourcedProducerQueueSpec.scala @@ -4,6 +4,7 @@ package akka.persistence.typed.delivery +import scala.concurrent.duration._ import java.util.UUID import java.util.concurrent.atomic.AtomicInteger @@ -72,6 +73,8 @@ class WorkPullingWithEventSourcedProducerQueueSpec testKit.stop(consumerController) consumerProbe.expectTerminated(consumerController) + system.log.info("------------------ Start 2") + val producerController2 = spawn( WorkPullingProducerController[String]( producerId, @@ -82,6 +85,13 @@ class WorkPullingWithEventSourcedProducerQueueSpec val consumerController2 = spawn(ConsumerController[String](serviceKey)) consumerController2 ! ConsumerController.Start(consumerProbe.ref) + // start two consumers (same consumerProbe) to reproduce issue #29854 + val consumerController3 = spawn(ConsumerController[String](serviceKey)) + consumerController3 ! ConsumerController.Start(consumerProbe.ref) + + val requestNext4 = producerProbe.receiveMessage() + producerProbe.expectNoMessage() + val delivery1 = consumerProbe.receiveMessage() delivery1.message should ===("a") delivery1.confirmTo ! ConsumerController.Confirmed @@ -94,7 +104,7 @@ class WorkPullingWithEventSourcedProducerQueueSpec delivery3.message should ===("c") delivery3.confirmTo ! ConsumerController.Confirmed - val requestNext4 = producerProbe.receiveMessage() + producerProbe.expectNoMessage() requestNext4.sendNextTo ! "d" val delivery4 = consumerProbe.receiveMessage() @@ -153,6 +163,7 @@ class WorkPullingWithEventSourcedProducerQueueSpec delivery3.confirmTo ! ConsumerController.Confirmed val requestNext4 = producerProbe.receiveMessage() + producerProbe.expectNoMessage() requestNext4.sendNextTo ! "d" // TODO Should we try harder to deduplicate first? @@ -208,7 +219,10 @@ class WorkPullingWithEventSourcedProducerQueueSpec val batch1 = 15 val confirmed1 = 10 (1 to batch1).foreach { n => - producerProbe.receiveMessage().sendNextTo ! s"msg-$n" + val reqNext = producerProbe.receiveMessage() + if (n == 1 || n == 7 || n == 13) // not checking all because takes too much time + producerProbe.expectNoMessage(50.millis) // issue #29854 + reqNext.sendNextTo ! s"msg-$n" } (1 to confirmed1).foreach { _ => @@ -229,6 +243,10 @@ class WorkPullingWithEventSourcedProducerQueueSpec val consumerController4 = spawn(ConsumerController[String](serviceKey)) consumerController4 ! ConsumerController.Start(consumerProbe.ref) + // start two consumers (same consumerProbe) to reproduce issue #29854 + val consumerController5 = spawn(ConsumerController[String](serviceKey)) + consumerController5 ! ConsumerController.Start(consumerProbe.ref) + val producerController2 = spawn( WorkPullingProducerController[String]( producerId, @@ -238,7 +256,10 @@ class WorkPullingWithEventSourcedProducerQueueSpec val batch2 = 5 (batch1 + 1 to batch1 + batch2).foreach { n => - producerProbe.receiveMessage().sendNextTo ! s"msg-$n" + val reqNext = producerProbe.receiveMessage() + if (n == batch1 + 1 || n == batch1 + 3) // not checking all because takes too much time + producerProbe.expectNoMessage(50.millis) // issue #29854 + reqNext.sendNextTo ! s"msg-$n" } consumerProbe.fishForMessage(consumerProbe.remainingOrDefault) { delivery =>