=per #17150 Harden AtLeastOnceDeliverySpec

* UnconfirmedWarning message spilled over from previous test step,
  using separate probes instead of testActor
* the resend interval was too short so that unexpected resend occured
  as seen by the `failed: expected Action(4,a-4), found Action(3,a-3)`
This commit is contained in:
Patrik Nordwall 2015-05-13 15:18:22 +02:00
parent c68ebc6d5a
commit 10c0d64381

View file

@ -149,37 +149,39 @@ object AtLeastOnceDeliverySpec {
}
abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec {
import akka.persistence.AtLeastOnceDeliverySpec._
"AtLeastOnceDelivery" must {
"deliver messages in order when nothing is lost" in {
"deliver messages in order when nothing is lost" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name)
snd ! Req("a")
expectMsg(ReqAck)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
snd.tell(Req("a"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a"))
probeA.expectNoMsg(1.second)
}
"re-deliver lost messages" in {
"re-deliver lost messages" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
snd.tell(Req("a-2"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
expectMsg(ReqAck)
expectMsg(ReqAck)
snd.tell(Req("a-3"), probe.ref)
snd.tell(Req("a-4"), probe.ref)
probe.expectMsg(ReqAck)
probe.expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// and then re-delivered
@ -187,64 +189,66 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
probeA.expectNoMsg(1.second)
}
"re-deliver lost messages after restart" in {
"re-deliver lost messages after restart" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
snd.tell(Req("a-2"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
expectMsg(ReqAck)
expectMsg(ReqAck)
snd.tell(Req("a-3"), probe.ref)
snd.tell(Req("a-4"), probe.ref)
probe.expectMsg(ReqAck)
probe.expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// trigger restart
snd ! Boom
snd.tell(Boom, probe.ref)
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
snd ! Req("a-5")
expectMsg(ReqAck)
snd.tell(Req("a-5"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(5, "a-5"))
probeA.expectNoMsg(1.second)
}
"re-send replayed deliveries with an 'initially in-order' strategy, before delivering fresh messages" in {
"re-send replayed deliveries with an 'initially in-order' strategy, before delivering fresh messages" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
snd.tell(Req("a-2"), probe.ref)
probe.expectMsg(ReqAck)
// a-2 was lost
snd ! Req("a-3")
expectMsg(ReqAck)
snd.tell(Req("a-3"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(3, "a-3"))
snd ! Req("a-4")
expectMsg(ReqAck)
snd.tell(Req("a-4"), probe.ref)
probe.expectMsg(ReqAck)
// a-4 was lost
// trigger restart
snd ! Boom
snd ! Req("a-5")
expectMsg(ReqAck)
snd.tell(Boom, probe.ref)
snd.tell(Req("a-5"), probe.ref)
probe.expectMsg(ReqAck)
// and then re-delivered
probeA.expectMsg(Action(2, "a-2")) // re-delivered
@ -255,55 +259,57 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
probeA.expectNoMsg(1.second)
}
"restore state from snapshot" in {
"restore state from snapshot" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, 1000, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name)
snd.tell(Req("a-1"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
snd.tell(Req("a-2"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
snd ! SaveSnap
expectMsg(ReqAck)
expectMsg(ReqAck)
snd.tell(Req("a-3"), probe.ref)
snd.tell(Req("a-4"), probe.ref)
snd.tell(SaveSnap, probe.ref)
probe.expectMsg(ReqAck)
probe.expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// after snapshot succeeded
expectMsgType[SaveSnapshotSuccess]
probe.expectMsgType[SaveSnapshotSuccess]
// trigger restart
snd ! Boom
snd.tell(Boom, probe.ref)
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
snd ! Req("a-5")
expectMsg(ReqAck)
snd.tell(Req("a-5"), probe.ref)
probe.expectMsg(ReqAck)
probeA.expectMsg(Action(5, "a-5"))
probeA.expectNoMsg(1.second)
}
"warn about unconfirmed messages" in {
"warn about unconfirmed messages" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val probeB = TestProbe()
val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, 1000, async = false, destinations), name)
snd ! Req("a-1")
snd ! Req("b-1")
snd ! Req("b-2")
expectMsg(ReqAck)
expectMsg(ReqAck)
expectMsg(ReqAck)
val unconfirmed = receiveWhile(3.seconds) {
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 3, 1000, async = false, destinations), name)
snd.tell(Req("a-1"), probe.ref)
snd.tell(Req("b-1"), probe.ref)
snd.tell(Req("b-2"), probe.ref)
probe.expectMsg(ReqAck)
probe.expectMsg(ReqAck)
probe.expectMsg(ReqAck)
val unconfirmed = probe.receiveWhile(5.seconds) {
case UnconfirmedWarning(unconfirmed) unconfirmed
}.flatten
unconfirmed.map(_.destination).toSet should ===(Set(probeA.ref.path, probeB.ref.path))
@ -311,7 +317,8 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
system.stop(snd)
}
"re-deliver many lost messages" in {
"re-deliver many lost messages" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val probeB = TestProbe()
val probeC = TestProbe()
@ -322,16 +329,16 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
"A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path,
"B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path,
"C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path)
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, 1000, async = true, destinations), name)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = true, destinations), name)
val N = 100
for (n 1 to N) {
snd ! Req("a-" + n)
snd.tell(Req("a-" + n), probe.ref)
}
for (n 1 to N) {
snd ! Req("b-" + n)
snd.tell(Req("b-" + n), probe.ref)
}
for (n 1 to N) {
snd ! Req("c-" + n)
snd.tell(Req("c-" + n), probe.ref)
}
val deliverWithin = 20.seconds
probeA.receiveN(N, deliverWithin).map { case a: Action a.payload }.toSet should ===((1 to N).map(n "a-" + n).toSet)
@ -339,16 +346,17 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
probeC.receiveN(N, deliverWithin).map { case a: Action a.payload }.toSet should ===((1 to N).map(n "c-" + n).toSet)
}
"limit the number of messages redelivered at once" in {
"limit the number of messages redelivered at once" taggedAs (TimingTest) in {
val probe = TestProbe()
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 2, async = true, destinations), name)
val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 2, async = true, destinations), name)
val N = 10
for (n 1 to N) {
snd ! Req("a-" + n)
snd.tell(Req("a-" + n), probe.ref)
}
// initially all odd messages should go through