From fdb0d9ee3bbf71c5e5dee67265d1cf2fbb4c3b23 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Tue, 29 Jul 2014 17:28:42 +0200 Subject: [PATCH] =per #15590 Increase efforts to in-order on start of ALOD AtLeastOnceDelivery can delivery out-of-order, and that's OK. Although, in the case of message replay followed by taking user land commands which may trigger `deliver` calls, it is nicer to at least once try to send the replayed but not confirmed deliveries *first*, before sending the completely new deliveries. This change acomplishes this by triggering redelivery explicitly when recovery has finished, and setting the timestamps on these messages a bit in the past, so they hit their redelivery deadline right away during this recovery induced redelivery. Resolves #15590 --- .../persistence/AtLeastOnceDelivery.scala | 19 ++++++++-- .../persistence/AtLeastOnceDeliverySpec.scala | 38 ++++++++++++++++++- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index 00fb4292d3..a85c5c0e2f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -3,6 +3,8 @@ */ package akka.persistence +import akka.persistence.JournalProtocol.ReplayMessagesSuccess + import scala.annotation.tailrec import scala.collection.breakOut import scala.collection.immutable @@ -189,8 +191,9 @@ trait AtLeastOnceDelivery extends Processor { s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]") val deliveryId = nextDeliverySequenceNr() - val now = System.nanoTime() + val now = if (recoveryRunning) { System.nanoTime() - redeliverInterval.toNanos } else System.nanoTime() val d = Delivery(destination, deliveryIdToMessage(deliveryId), now, attempt = 0) + if (recoveryRunning) unconfirmed = unconfirmed.updated(deliveryId, d) else @@ -219,14 +222,17 @@ trait AtLeastOnceDelivery extends Processor { val now = System.nanoTime() val deadline = now - redeliverInterval.toNanos var warnings = Vector.empty[UnconfirmedDelivery] + unconfirmed foreach { case (deliveryId, delivery) ⇒ if (delivery.timestamp <= deadline) { send(deliveryId, delivery, now) + if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts) warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message) } } + if (warnings.nonEmpty) self ! UnconfirmedWarning(warnings) } @@ -283,8 +289,15 @@ trait AtLeastOnceDelivery extends Processor { */ override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = message match { - case RedeliveryTick ⇒ redeliverOverdue() - case _ ⇒ super.aroundReceive(receive, message) + case ReplayMessagesSuccess ⇒ + redeliverOverdue() + super.aroundReceive(receive, message) + + case RedeliveryTick ⇒ + redeliverOverdue() + + case x ⇒ + super.aroundReceive(receive, message) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index b2aeaee4e3..c28a0a4b61 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -218,6 +218,41 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) probeA.expectNoMsg(1.second) } + "re-send replayed deliveries with an 'initially in-order' strategy, before delivering fresh messages" in { + val probeA = TestProbe() + val dst = system.actorOf(destinationProps(probeA.ref)) + val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + snd ! Req("a-1") + expectMsg(ReqAck) + probeA.expectMsg(Action(1, "a-1")) + + snd ! Req("a-2") + expectMsg(ReqAck) + // a-2 was lost + + snd ! Req("a-3") + expectMsg(ReqAck) + probeA.expectMsg(Action(3, "a-3")) + + snd ! Req("a-4") + expectMsg(ReqAck) + // a-4 was lost + + // trigger restart + snd ! Boom + snd ! Req("a-5") + expectMsg(ReqAck) + + // and then re-delivered + probeA.expectMsg(Action(2, "a-2")) // re-delivered + // a-4 was re-delivered but lost + probeA.expectMsg(Action(5, "a-5")) // re-delivered + probeA.expectMsg(Action(4, "a-4")) // re-delivered, 3rd time + + probeA.expectNoMsg(1.second) + } + "restore state from snapshot" in { val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) @@ -307,8 +342,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec( // TODO disable debug logging once happy with stability of this test - ConfigFactory.parseString("""akka.logLevel = DEBUG""") withFallback PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec") -) + ConfigFactory.parseString("""akka.logLevel = DEBUG""") withFallback PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec")) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec"))