Harden ReliableDeliveryShardingSpec (#31234)
Allow duplicate messages to be delivered to the consumer
This commit is contained in:
parent
f6e73fa5ac
commit
c63d18f663
1 changed files with 13 additions and 7 deletions
|
|
@ -32,8 +32,6 @@ import akka.cluster.typed.Join
|
||||||
|
|
||||||
object ReliableDeliveryShardingSpec {
|
object ReliableDeliveryShardingSpec {
|
||||||
val config = ConfigFactory.parseString("""
|
val config = ConfigFactory.parseString("""
|
||||||
// temporary on debug to diagnose https://github.com/akka/akka/issues/30664
|
|
||||||
akka.loglevel = debug
|
|
||||||
akka.actor.provider = cluster
|
akka.actor.provider = cluster
|
||||||
akka.remote.classic.netty.tcp.port = 0
|
akka.remote.classic.netty.tcp.port = 0
|
||||||
akka.remote.artery.canonical.port = 0
|
akka.remote.artery.canonical.port = 0
|
||||||
|
|
@ -337,9 +335,6 @@ class ReliableDeliveryShardingSpec
|
||||||
testKit.stop(shardingProducerController)
|
testKit.stop(shardingProducerController)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This test sometimes files on GitHub Actions (https://github.com/akka/akka/issues/30664),
|
|
||||||
// which might be a real issue.
|
|
||||||
// It seems msg-3, which is 'correctly' redelivered once, is sometimes redelivered twice.
|
|
||||||
"deliver unconfirmed if ShardingConsumerController is terminated" in {
|
"deliver unconfirmed if ShardingConsumerController is terminated" in {
|
||||||
// for example if ShardingConsumerController is rebalanced, but no more messages are sent to the entity
|
// for example if ShardingConsumerController is rebalanced, but no more messages are sent to the entity
|
||||||
nextId()
|
nextId()
|
||||||
|
|
@ -386,8 +381,19 @@ class ReliableDeliveryShardingSpec
|
||||||
// msg-3 is redelivered
|
// msg-3 is redelivered
|
||||||
delivery3b.message should ===(TestConsumer.Job("msg-3"))
|
delivery3b.message should ===(TestConsumer.Job("msg-3"))
|
||||||
delivery3b.confirmTo ! ConsumerController.Confirmed
|
delivery3b.confirmTo ! ConsumerController.Confirmed
|
||||||
val delivery4 = consumerProbes(1).receiveMessage()
|
val delivery3cor4 = consumerProbes(1).receiveMessage()
|
||||||
delivery4.message should ===(TestConsumer.Job("msg-4"))
|
delivery3cor4.message match {
|
||||||
|
case TestConsumer.Job("msg-3") =>
|
||||||
|
// It is possible the ProducerController re-sends msg-3 again before it has processed its acknowledgement.
|
||||||
|
// If the ConsumerController restarts between sending the acknowledgement and receiving that re-sent msg-3,
|
||||||
|
// it will deliver msg-3 a second time. We then expect msg-4 next:
|
||||||
|
val delivery4 = consumerProbes(1).receiveMessage()
|
||||||
|
delivery4.message should ===(TestConsumer.Job("msg-4"))
|
||||||
|
case TestConsumer.Job("msg-4") =>
|
||||||
|
// OK!
|
||||||
|
case other =>
|
||||||
|
throw new MatchError(other)
|
||||||
|
}
|
||||||
|
|
||||||
// redeliver also when no more messages are sent
|
// redeliver also when no more messages are sent
|
||||||
consumerProbes(1).stop()
|
consumerProbes(1).stop()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue