Merge pull request #18416 from akka/wip-18415-redeliver-patriknw
=per #18415 Don't start redelivery during recover
This commit is contained in:
commit
2c144f74ea
1 changed files with 12 additions and 7 deletions
|
|
@ -9,6 +9,7 @@ import scala.collection.immutable
|
|||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.actor.{ ActorSelection, Actor, ActorPath, NotInfluenceReceiveTimeout }
|
||||
import akka.persistence.serialization.Message
|
||||
import akka.actor.Cancellable
|
||||
|
||||
object AtLeastOnceDelivery {
|
||||
|
||||
|
|
@ -172,15 +173,18 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
|
|||
private val defaultMaxUnconfirmedMessages: Int =
|
||||
Persistence(context.system).settings.atLeastOnceDelivery.maxUnconfirmedMessages
|
||||
|
||||
private val redeliverTask = {
|
||||
import context.dispatcher
|
||||
val interval = redeliverInterval / 2
|
||||
context.system.scheduler.schedule(interval, interval, self, RedeliveryTick)
|
||||
}
|
||||
// will be started after recovery completed
|
||||
private var redeliverTask: Option[Cancellable] = None
|
||||
|
||||
private var deliverySequenceNr = 0L
|
||||
private var unconfirmed = immutable.SortedMap.empty[Long, Delivery]
|
||||
|
||||
private def startRedeliverTask(): Unit = {
|
||||
val interval = redeliverInterval / 2
|
||||
redeliverTask = Some(
|
||||
context.system.scheduler.schedule(interval, interval, self, RedeliveryTick)(context.dispatcher))
|
||||
}
|
||||
|
||||
private def nextDeliverySequenceNr(): Long = {
|
||||
deliverySequenceNr += 1
|
||||
deliverySequenceNr
|
||||
|
|
@ -323,7 +327,7 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
|
|||
* INTERNAL API
|
||||
*/
|
||||
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
|
||||
redeliverTask.cancel()
|
||||
redeliverTask.foreach(_.cancel())
|
||||
super.aroundPreRestart(reason, message)
|
||||
}
|
||||
|
||||
|
|
@ -331,12 +335,13 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
|
|||
* INTERNAL API
|
||||
*/
|
||||
override protected[akka] def aroundPostStop(): Unit = {
|
||||
redeliverTask.cancel()
|
||||
redeliverTask.foreach(_.cancel())
|
||||
super.aroundPostStop()
|
||||
}
|
||||
|
||||
override private[akka] def onReplaySuccess(): Unit = {
|
||||
redeliverOverdue()
|
||||
startRedeliverTask()
|
||||
super.onReplaySuccess()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue