Harden ReliableDeliveryShardingSpec, #31263 (#31370)

This commit is contained in:
Patrik Nordwall 2022-04-28 07:38:59 +02:00 committed by GitHub
parent af1e756053
commit a485dc0df2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 23 deletions

View file

@ -67,7 +67,7 @@ public class AccountExampleDocTest
public void createWithUnHandle() {
CommandResultWithReply<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
assertFalse(result.hasNoReply());
}

View file

@ -373,33 +373,48 @@ class ReliableDeliveryShardingSpec
delivery3.message should ===(TestConsumer.Job("msg-3"))
// msg-3 not Confirmed
consumerProbes(0).stop()
Thread.sleep(1000) // let it terminate
{
consumerProbes(0).stop()
Thread.sleep(1000) // let it terminate
producerProbe.receiveMessage().sendNextTo ! ShardingEnvelope("entity-1", TestConsumer.Job("msg-4"))
val delivery3b = consumerProbes(1).receiveMessage()
// msg-3 is redelivered
delivery3b.message should ===(TestConsumer.Job("msg-3"))
delivery3b.confirmTo ! ConsumerController.Confirmed
val delivery3cor4 = consumerProbes(1).receiveMessage()
delivery3cor4.message match {
case TestConsumer.Job("msg-3") =>
// It is possible the ProducerController re-sends msg-3 again before it has processed its acknowledgement.
// If the ConsumerController restarts between sending the acknowledgement and receiving that re-sent msg-3,
// it will deliver msg-3 a second time. We then expect msg-4 next:
val delivery4 = consumerProbes(1).receiveMessage()
delivery4.message should ===(TestConsumer.Job("msg-4"))
case TestConsumer.Job("msg-4") =>
// OK!
case other =>
throw new MatchError(other)
producerProbe.receiveMessage().sendNextTo ! ShardingEnvelope("entity-1", TestConsumer.Job("msg-4"))
val delivery3b = consumerProbes(1).receiveMessage()
// msg-3 is redelivered
delivery3b.message should ===(TestConsumer.Job("msg-3"))
delivery3b.confirmTo ! ConsumerController.Confirmed
val delivery3cor4 = consumerProbes(1).receiveMessage()
delivery3cor4.message match {
case TestConsumer.Job("msg-3") =>
// It is possible the ProducerController re-sends msg-3 again before it has processed its acknowledgement.
// If the ConsumerController restarts between sending the acknowledgement and receiving that re-sent msg-3,
// it will deliver msg-3 a second time. We then expect msg-4 next:
val delivery4 = consumerProbes(1).receiveMessage()
delivery4.message should ===(TestConsumer.Job("msg-4"))
case TestConsumer.Job("msg-4") =>
// OK!
case other =>
throw new MatchError(other)
}
}
// redeliver also when no more messages are sent
consumerProbes(1).stop()
{
consumerProbes(1).stop()
val delivery4b = consumerProbes(2).receiveMessage()
delivery4b.message should ===(TestConsumer.Job("msg-4"))
val delivery3cor4 = consumerProbes(2).receiveMessage()
delivery3cor4.message match {
case TestConsumer.Job("msg-3") =>
// It is possible the ProducerController re-sends msg-3 again before it has processed its acknowledgement.
// If the ConsumerController restarts between sending the acknowledgement and receiving that re-sent msg-3,
// it will deliver msg-3 a second time. We then expect msg-4 next:
val delivery4 = consumerProbes(2).receiveMessage()
delivery4.message should ===(TestConsumer.Job("msg-4"))
case TestConsumer.Job("msg-4") =>
// OK!
case other =>
throw new MatchError(other)
}
}
consumerProbes(2).stop()
testKit.stop(shardingProducerController)