From a485dc0df28eff38f848a749af8a6413b5ec3379 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Apr 2022 07:38:59 +0200 Subject: [PATCH] Harden ReliableDeliveryShardingSpec, #31263 (#31370) --- .../sharding/typed/AccountExampleDocTest.java | 2 +- .../ReliableDeliveryShardingSpec.scala | 59 ++++++++++++------- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java index 0dcae9c2e9..d9fbdad3c6 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java @@ -67,7 +67,7 @@ public class AccountExampleDocTest public void createWithUnHandle() { CommandResultWithReply< AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply> - result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); + result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); assertFalse(result.hasNoReply()); } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala index 5870855586..4a2166f771 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala @@ -373,33 +373,48 @@ class ReliableDeliveryShardingSpec delivery3.message should ===(TestConsumer.Job("msg-3")) // msg-3 not Confirmed - consumerProbes(0).stop() - Thread.sleep(1000) // let it terminate + { + consumerProbes(0).stop() + Thread.sleep(1000) // let it terminate - producerProbe.receiveMessage().sendNextTo ! ShardingEnvelope("entity-1", TestConsumer.Job("msg-4")) - val delivery3b = consumerProbes(1).receiveMessage() - // msg-3 is redelivered - delivery3b.message should ===(TestConsumer.Job("msg-3")) - delivery3b.confirmTo ! ConsumerController.Confirmed - val delivery3cor4 = consumerProbes(1).receiveMessage() - delivery3cor4.message match { - case TestConsumer.Job("msg-3") => - // It is possible the ProducerController re-sends msg-3 again before it has processed its acknowledgement. - // If the ConsumerController restarts between sending the acknowledgement and receiving that re-sent msg-3, - // it will deliver msg-3 a second time. We then expect msg-4 next: - val delivery4 = consumerProbes(1).receiveMessage() - delivery4.message should ===(TestConsumer.Job("msg-4")) - case TestConsumer.Job("msg-4") => - // OK! - case other => - throw new MatchError(other) + producerProbe.receiveMessage().sendNextTo ! ShardingEnvelope("entity-1", TestConsumer.Job("msg-4")) + val delivery3b = consumerProbes(1).receiveMessage() + // msg-3 is redelivered + delivery3b.message should ===(TestConsumer.Job("msg-3")) + delivery3b.confirmTo ! ConsumerController.Confirmed + val delivery3cor4 = consumerProbes(1).receiveMessage() + delivery3cor4.message match { + case TestConsumer.Job("msg-3") => + // It is possible the ProducerController re-sends msg-3 again before it has processed its acknowledgement. + // If the ConsumerController restarts between sending the acknowledgement and receiving that re-sent msg-3, + // it will deliver msg-3 a second time. We then expect msg-4 next: + val delivery4 = consumerProbes(1).receiveMessage() + delivery4.message should ===(TestConsumer.Job("msg-4")) + case TestConsumer.Job("msg-4") => + // OK! + case other => + throw new MatchError(other) + } } // redeliver also when no more messages are sent - consumerProbes(1).stop() + { + consumerProbes(1).stop() - val delivery4b = consumerProbes(2).receiveMessage() - delivery4b.message should ===(TestConsumer.Job("msg-4")) + val delivery3cor4 = consumerProbes(2).receiveMessage() + delivery3cor4.message match { + case TestConsumer.Job("msg-3") => + // It is possible the ProducerController re-sends msg-3 again before it has processed its acknowledgement. + // If the ConsumerController restarts between sending the acknowledgement and receiving that re-sent msg-3, + // it will deliver msg-3 a second time. We then expect msg-4 next: + val delivery4 = consumerProbes(2).receiveMessage() + delivery4.message should ===(TestConsumer.Job("msg-4")) + case TestConsumer.Job("msg-4") => + // OK! + case other => + throw new MatchError(other) + } + } consumerProbes(2).stop() testKit.stop(shardingProducerController)