From 10c0d643815904e593b31600dba4e02cd1f27006 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 May 2015 15:18:22 +0200 Subject: [PATCH] =per #17150 Harden AtLeastOnceDeliverySpec * UnconfirmedWarning message spilled over from previous test step, using separate probes instead of testActor * the resend interval was too short so that unexpected resend occured as seen by the `failed: expected Action(4,a-4), found Action(3,a-3)` --- .../persistence/AtLeastOnceDeliverySpec.scala | 154 +++++++++--------- 1 file changed, 81 insertions(+), 73 deletions(-) diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index d8657e2441..01b7857f92 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -149,37 +149,39 @@ object AtLeastOnceDeliverySpec { } -abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { +abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec { import akka.persistence.AtLeastOnceDeliverySpec._ "AtLeastOnceDelivery" must { - "deliver messages in order when nothing is lost" in { + "deliver messages in order when nothing is lost" taggedAs (TimingTest) in { + val probe = TestProbe() val probeA = TestProbe() val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name) - snd ! Req("a") - expectMsg(ReqAck) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) + snd.tell(Req("a"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a")) probeA.expectNoMsg(1.second) } - "re-deliver lost messages" in { + "re-deliver lost messages" taggedAs (TimingTest) in { + val probe = TestProbe() val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name) - snd ! Req("a-1") - expectMsg(ReqAck) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) + snd.tell(Req("a-1"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) - snd ! Req("a-2") - expectMsg(ReqAck) + snd.tell(Req("a-2"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(2, "a-2")) - snd ! Req("a-3") - snd ! Req("a-4") - expectMsg(ReqAck) - expectMsg(ReqAck) + snd.tell(Req("a-3"), probe.ref) + snd.tell(Req("a-4"), probe.ref) + probe.expectMsg(ReqAck) + probe.expectMsg(ReqAck) // a-3 was lost probeA.expectMsg(Action(4, "a-4")) // and then re-delivered @@ -187,64 +189,66 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) probeA.expectNoMsg(1.second) } - "re-deliver lost messages after restart" in { + "re-deliver lost messages after restart" taggedAs (TimingTest) in { + val probe = TestProbe() val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name) - snd ! Req("a-1") - expectMsg(ReqAck) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) + snd.tell(Req("a-1"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) - snd ! Req("a-2") - expectMsg(ReqAck) + snd.tell(Req("a-2"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(2, "a-2")) - snd ! Req("a-3") - snd ! Req("a-4") - expectMsg(ReqAck) - expectMsg(ReqAck) + snd.tell(Req("a-3"), probe.ref) + snd.tell(Req("a-4"), probe.ref) + probe.expectMsg(ReqAck) + probe.expectMsg(ReqAck) // a-3 was lost probeA.expectMsg(Action(4, "a-4")) // trigger restart - snd ! Boom + snd.tell(Boom, probe.ref) // and then re-delivered probeA.expectMsg(Action(3, "a-3")) - snd ! Req("a-5") - expectMsg(ReqAck) + snd.tell(Req("a-5"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(5, "a-5")) probeA.expectNoMsg(1.second) } - "re-send replayed deliveries with an 'initially in-order' strategy, before delivering fresh messages" in { + "re-send replayed deliveries with an 'initially in-order' strategy, before delivering fresh messages" taggedAs (TimingTest) in { + val probe = TestProbe() val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name) - snd ! Req("a-1") - expectMsg(ReqAck) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) + snd.tell(Req("a-1"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) - snd ! Req("a-2") - expectMsg(ReqAck) + snd.tell(Req("a-2"), probe.ref) + probe.expectMsg(ReqAck) // a-2 was lost - snd ! Req("a-3") - expectMsg(ReqAck) + snd.tell(Req("a-3"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(3, "a-3")) - snd ! Req("a-4") - expectMsg(ReqAck) + snd.tell(Req("a-4"), probe.ref) + probe.expectMsg(ReqAck) // a-4 was lost // trigger restart - snd ! Boom - snd ! Req("a-5") - expectMsg(ReqAck) + snd.tell(Boom, probe.ref) + snd.tell(Req("a-5"), probe.ref) + probe.expectMsg(ReqAck) // and then re-delivered probeA.expectMsg(Action(2, "a-2")) // re-delivered @@ -255,55 +259,57 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) probeA.expectNoMsg(1.second) } - "restore state from snapshot" in { + "restore state from snapshot" taggedAs (TimingTest) in { + val probe = TestProbe() val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, 1000, async = false, destinations), name) - snd ! Req("a-1") - expectMsg(ReqAck) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = false, destinations), name) + snd.tell(Req("a-1"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) - snd ! Req("a-2") - expectMsg(ReqAck) + snd.tell(Req("a-2"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(2, "a-2")) - snd ! Req("a-3") - snd ! Req("a-4") - snd ! SaveSnap - expectMsg(ReqAck) - expectMsg(ReqAck) + snd.tell(Req("a-3"), probe.ref) + snd.tell(Req("a-4"), probe.ref) + snd.tell(SaveSnap, probe.ref) + probe.expectMsg(ReqAck) + probe.expectMsg(ReqAck) // a-3 was lost probeA.expectMsg(Action(4, "a-4")) // after snapshot succeeded - expectMsgType[SaveSnapshotSuccess] + probe.expectMsgType[SaveSnapshotSuccess] // trigger restart - snd ! Boom + snd.tell(Boom, probe.ref) // and then re-delivered probeA.expectMsg(Action(3, "a-3")) - snd ! Req("a-5") - expectMsg(ReqAck) + snd.tell(Req("a-5"), probe.ref) + probe.expectMsg(ReqAck) probeA.expectMsg(Action(5, "a-5")) probeA.expectNoMsg(1.second) } - "warn about unconfirmed messages" in { + "warn about unconfirmed messages" taggedAs (TimingTest) in { + val probe = TestProbe() val probeA = TestProbe() val probeB = TestProbe() val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, 1000, async = false, destinations), name) - snd ! Req("a-1") - snd ! Req("b-1") - snd ! Req("b-2") - expectMsg(ReqAck) - expectMsg(ReqAck) - expectMsg(ReqAck) - val unconfirmed = receiveWhile(3.seconds) { + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 3, 1000, async = false, destinations), name) + snd.tell(Req("a-1"), probe.ref) + snd.tell(Req("b-1"), probe.ref) + snd.tell(Req("b-2"), probe.ref) + probe.expectMsg(ReqAck) + probe.expectMsg(ReqAck) + probe.expectMsg(ReqAck) + val unconfirmed = probe.receiveWhile(5.seconds) { case UnconfirmedWarning(unconfirmed) ⇒ unconfirmed }.flatten unconfirmed.map(_.destination).toSet should ===(Set(probeA.ref.path, probeB.ref.path)) @@ -311,7 +317,8 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) system.stop(snd) } - "re-deliver many lost messages" in { + "re-deliver many lost messages" taggedAs (TimingTest) in { + val probe = TestProbe() val probeA = TestProbe() val probeB = TestProbe() val probeC = TestProbe() @@ -322,16 +329,16 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) "A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path, "B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path, "C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path) - val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, 1000, async = true, destinations), name) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 1000, async = true, destinations), name) val N = 100 for (n ← 1 to N) { - snd ! Req("a-" + n) + snd.tell(Req("a-" + n), probe.ref) } for (n ← 1 to N) { - snd ! Req("b-" + n) + snd.tell(Req("b-" + n), probe.ref) } for (n ← 1 to N) { - snd ! Req("c-" + n) + snd.tell(Req("c-" + n), probe.ref) } val deliverWithin = 20.seconds probeA.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should ===((1 to N).map(n ⇒ "a-" + n).toSet) @@ -339,16 +346,17 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) probeC.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should ===((1 to N).map(n ⇒ "c-" + n).toSet) } - "limit the number of messages redelivered at once" in { + "limit the number of messages redelivered at once" taggedAs (TimingTest) in { + val probe = TestProbe() val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 2, async = true, destinations), name) + val snd = system.actorOf(senderProps(probe.ref, name, 1000.millis, 5, 2, async = true, destinations), name) val N = 10 for (n ← 1 to N) { - snd ! Req("a-" + n) + snd.tell(Req("a-" + n), probe.ref) } // initially all odd messages should go through