2013-09-14 14:19:18 +02:00
|
|
|
/**
|
2014-02-02 19:05:45 -06:00
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.persistence
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
import java.lang.{ Iterable ⇒ JIterable }
|
|
|
|
|
|
2013-12-06 12:48:44 +01:00
|
|
|
import scala.collection.immutable
|
2014-01-17 06:58:25 +01:00
|
|
|
import scala.collection.JavaConverters._
|
2013-12-06 12:48:44 +01:00
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
import scala.language.postfixOps
|
|
|
|
|
|
2013-09-14 14:19:18 +02:00
|
|
|
import akka.actor._
|
|
|
|
|
|
2013-11-07 10:45:02 +01:00
|
|
|
import akka.persistence.serialization.Message
|
2014-01-17 06:58:25 +01:00
|
|
|
import akka.persistence.JournalProtocol._
|
2013-11-07 10:45:02 +01:00
|
|
|
|
2013-12-06 12:48:44 +01:00
|
|
|
/**
|
|
|
|
|
* A [[Channel]] configuration object.
|
|
|
|
|
*
|
2014-01-17 06:58:25 +01:00
|
|
|
* @param redeliverMax Maximum number of redelivery attempts.
|
|
|
|
|
* @param redeliverInterval Interval between redelivery attempts.
|
|
|
|
|
* @param redeliverFailureListener Receiver of [[RedeliverFailure]] notifications which are sent when the number
|
|
|
|
|
* of redeliveries reaches `redeliverMax` for a sequence of messages. To enforce
|
|
|
|
|
* a redelivery of these messages, the listener has to restart the sending processor.
|
|
|
|
|
* Alternatively, it can also confirm these messages, preventing further redeliveries.
|
2013-12-06 12:48:44 +01:00
|
|
|
*/
|
|
|
|
|
@SerialVersionUID(1L)
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class ChannelSettings(
|
2014-01-17 06:58:25 +01:00
|
|
|
val redeliverMax: Int = 5,
|
|
|
|
|
val redeliverInterval: FiniteDuration = 5.seconds,
|
|
|
|
|
val redeliverFailureListener: Option[ActorRef] = None) {
|
2013-12-06 12:48:44 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API.
|
|
|
|
|
*/
|
|
|
|
|
def withRedeliverMax(redeliverMax: Int): ChannelSettings =
|
2014-01-17 06:58:25 +01:00
|
|
|
copy(redeliverMax = redeliverMax)
|
2013-12-06 12:48:44 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API.
|
|
|
|
|
*/
|
|
|
|
|
def withRedeliverInterval(redeliverInterval: FiniteDuration): ChannelSettings =
|
2014-01-17 06:58:25 +01:00
|
|
|
copy(redeliverInterval = redeliverInterval)
|
2013-12-06 12:48:44 +01:00
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
/**
|
|
|
|
|
* Java API.
|
|
|
|
|
*/
|
|
|
|
|
def withRedeliverFailureListener(redeliverFailureListener: ActorRef): ChannelSettings =
|
|
|
|
|
copy(redeliverFailureListener = Option(redeliverFailureListener))
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
object ChannelSettings {
|
|
|
|
|
/**
|
|
|
|
|
* Java API.
|
|
|
|
|
*/
|
2014-01-17 06:58:25 +01:00
|
|
|
def create() = ChannelSettings.apply()
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
|
|
|
|
|
2013-09-14 14:19:18 +02:00
|
|
|
/**
|
2014-01-17 06:58:25 +01:00
|
|
|
* A channel is used by [[Processor]]s (and [[View]]s) for sending [[Persistent]] messages to destinations.
|
|
|
|
|
* The main responsibility of a channel is to prevent redundant delivery of replayed messages to destinations
|
2013-11-07 10:45:02 +01:00
|
|
|
* when a processor is recovered.
|
2013-09-14 14:19:18 +02:00
|
|
|
*
|
2014-01-17 06:58:25 +01:00
|
|
|
* A channel is instructed to deliver a persistent message to a destination with the [[Deliver]] command. A
|
|
|
|
|
* destination is provided as `ActorPath` and messages are sent via that path's `ActorSelection`.
|
2013-09-14 14:19:18 +02:00
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* class ForwardExample extends Processor {
|
|
|
|
|
* val destination = context.actorOf(Props[MyDestination])
|
|
|
|
|
* val channel = context.actorOf(Channel.props(), "myChannel")
|
|
|
|
|
*
|
|
|
|
|
* def receive = {
|
2013-11-20 13:47:42 +01:00
|
|
|
* case m @ Persistent(payload, _) =>
|
2013-09-14 14:19:18 +02:00
|
|
|
* // forward modified message to destination
|
2014-01-17 06:58:25 +01:00
|
|
|
* channel forward Deliver(m.withPayload(s"fw: ${payload}"), destination.path)
|
2013-09-14 14:19:18 +02:00
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
* }}}
|
|
|
|
|
*
|
|
|
|
|
* To reply to the sender of a persistent message, the `sender` reference should be used as channel
|
|
|
|
|
* destination.
|
|
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* class ReplyExample extends Processor {
|
|
|
|
|
* val channel = context.actorOf(Channel.props(), "myChannel")
|
|
|
|
|
*
|
|
|
|
|
* def receive = {
|
2013-11-20 13:47:42 +01:00
|
|
|
* case m @ Persistent(payload, _) =>
|
2013-09-14 14:19:18 +02:00
|
|
|
* // reply modified message to sender
|
2014-01-17 06:58:25 +01:00
|
|
|
* channel ! Deliver(m.withPayload(s"re: ${payload}"), sender.path)
|
2013-09-14 14:19:18 +02:00
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
* }}}
|
|
|
|
|
*
|
2013-11-07 10:45:02 +01:00
|
|
|
* Redundant delivery of messages to destinations is only prevented if the receipt of these messages
|
|
|
|
|
* is explicitly confirmed. Therefore, persistent messages that are delivered via a channel are of type
|
|
|
|
|
* [[ConfirmablePersistent]]. Their receipt can be confirmed by a destination by calling the `confirm()`
|
|
|
|
|
* method on these messages.
|
|
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* class MyDestination extends Actor {
|
|
|
|
|
* def receive = {
|
2013-12-06 12:48:44 +01:00
|
|
|
* case cp @ ConfirmablePersistent(payload, sequenceNr, redeliveries) => cp.confirm()
|
2013-11-07 10:45:02 +01:00
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
* }}}
|
|
|
|
|
*
|
2013-12-06 12:48:44 +01:00
|
|
|
* If a destination does not confirm the receipt of a `ConfirmablePersistent` message, it will be redelivered
|
2014-01-17 06:58:25 +01:00
|
|
|
* by the channel according to the parameters in [[ChannelSettings]]. Redelivered messages have a `redeliveries`
|
|
|
|
|
* value greater than zero.
|
2013-12-06 12:48:44 +01:00
|
|
|
*
|
2014-01-17 06:58:25 +01:00
|
|
|
* If the maximum number of redeliveries is reached for certain messages, they are removed from the channel and
|
|
|
|
|
* a `redeliverFailureListener` (if specified, see [[ChannelSettings]]) is notified about these messages with a
|
|
|
|
|
* [[RedeliverFailure]] message. Besides other application-specific tasks, this listener can restart the sending
|
|
|
|
|
* processor to enforce a redelivery of these messages or confirm these messages to prevent further redeliveries.
|
2013-11-07 10:45:02 +01:00
|
|
|
*
|
2013-09-14 14:19:18 +02:00
|
|
|
* @see [[Deliver]]
|
|
|
|
|
*/
|
2013-12-06 12:48:44 +01:00
|
|
|
final class Channel private[akka] (_channelId: Option[String], channelSettings: ChannelSettings) extends Actor {
|
2014-01-17 06:58:25 +01:00
|
|
|
import channelSettings._
|
|
|
|
|
|
2013-09-14 14:19:18 +02:00
|
|
|
private val id = _channelId match {
|
|
|
|
|
case Some(cid) ⇒ cid
|
2013-12-06 12:48:44 +01:00
|
|
|
case None ⇒ Persistence(context.system).channelId(self)
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
private val journal = Persistence(context.system).confirmationBatchingJournalForChannel(id)
|
|
|
|
|
private val delivery = context.actorOf(Props(classOf[ReliableDelivery], channelSettings))
|
2013-09-14 14:19:18 +02:00
|
|
|
|
2013-12-06 12:48:44 +01:00
|
|
|
def receive = {
|
2014-01-17 06:58:25 +01:00
|
|
|
case d @ Deliver(persistent: PersistentRepr, _) ⇒
|
|
|
|
|
if (!persistent.confirms.contains(id)) delivery forward d.copy(prepareDelivery(persistent))
|
|
|
|
|
case d: RedeliverFailure ⇒ redeliverFailureListener.foreach(_ ! d)
|
|
|
|
|
case d: Delivered ⇒ delivery forward d
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
2013-11-07 10:45:02 +01:00
|
|
|
|
2013-12-06 12:48:44 +01:00
|
|
|
private def prepareDelivery(persistent: PersistentRepr): PersistentRepr =
|
|
|
|
|
ConfirmablePersistentImpl(persistent,
|
|
|
|
|
confirmTarget = journal,
|
2014-01-17 06:58:25 +01:00
|
|
|
confirmMessage = DeliveredByChannel(persistent.processorId, id, persistent.sequenceNr, channel = self))
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
object Channel {
|
|
|
|
|
/**
|
2013-12-06 12:48:44 +01:00
|
|
|
* Returns a channel actor configuration object for creating a [[Channel]] with a
|
|
|
|
|
* generated id and default [[ChannelSettings]].
|
2013-11-07 10:45:02 +01:00
|
|
|
*/
|
2013-12-06 12:48:44 +01:00
|
|
|
def props(): Props =
|
|
|
|
|
props(ChannelSettings())
|
2013-11-07 10:45:02 +01:00
|
|
|
|
|
|
|
|
/**
|
2013-12-06 12:48:44 +01:00
|
|
|
* Returns a channel actor configuration object for creating a [[Channel]] with a
|
|
|
|
|
* generated id and specified `channelSettings`.
|
2013-11-07 10:45:02 +01:00
|
|
|
*
|
2013-12-06 12:48:44 +01:00
|
|
|
* @param channelSettings channel configuration object.
|
2013-11-07 10:45:02 +01:00
|
|
|
*/
|
2013-12-06 12:48:44 +01:00
|
|
|
def props(channelSettings: ChannelSettings): Props =
|
|
|
|
|
Props(classOf[Channel], None, channelSettings)
|
2013-11-07 10:45:02 +01:00
|
|
|
|
|
|
|
|
/**
|
2013-12-06 12:48:44 +01:00
|
|
|
* Returns a channel actor configuration object for creating a [[Channel]] with the
|
|
|
|
|
* specified id and default [[ChannelSettings]].
|
2013-11-07 10:45:02 +01:00
|
|
|
*
|
|
|
|
|
* @param channelId channel id.
|
|
|
|
|
*/
|
2013-12-06 12:48:44 +01:00
|
|
|
def props(channelId: String): Props =
|
|
|
|
|
props(channelId, ChannelSettings())
|
2013-11-07 10:45:02 +01:00
|
|
|
|
|
|
|
|
/**
|
2013-12-06 12:48:44 +01:00
|
|
|
* Returns a channel actor configuration object for creating a [[Channel]] with the
|
|
|
|
|
* specified id and specified `channelSettings`.
|
2013-11-07 10:45:02 +01:00
|
|
|
*
|
|
|
|
|
* @param channelId channel id.
|
2013-12-06 12:48:44 +01:00
|
|
|
* @param channelSettings channel configuration object.
|
2013-11-07 10:45:02 +01:00
|
|
|
*/
|
2013-12-06 12:48:44 +01:00
|
|
|
def props(channelId: String, channelSettings: ChannelSettings): Props =
|
|
|
|
|
Props(classOf[Channel], Some(channelId), channelSettings)
|
2013-11-07 10:45:02 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2014-01-17 06:58:25 +01:00
|
|
|
* Instructs a [[Channel]] or [[PersistentChannel]] to deliver a `persistent` message to
|
|
|
|
|
* a `destination`.
|
2013-09-14 14:19:18 +02:00
|
|
|
*
|
|
|
|
|
* @param persistent persistent message.
|
|
|
|
|
* @param destination persistent message destination.
|
|
|
|
|
*/
|
|
|
|
|
@SerialVersionUID(1L)
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class Deliver(persistent: Persistent, destination: ActorPath) extends Message
|
2013-09-14 14:19:18 +02:00
|
|
|
|
|
|
|
|
object Deliver {
|
|
|
|
|
/**
|
|
|
|
|
* Java API.
|
|
|
|
|
*/
|
2014-01-17 06:58:25 +01:00
|
|
|
def create(persistent: Persistent, destination: ActorPath) = Deliver(persistent, destination)
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2014-01-17 06:58:25 +01:00
|
|
|
* Plugin API: confirmation message generated by receivers of [[ConfirmablePersistent]] messages
|
|
|
|
|
* by calling `ConfirmablePersistent.confirm()`.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2014-01-17 06:58:25 +01:00
|
|
|
trait Delivered extends Message {
|
|
|
|
|
def channelId: String
|
|
|
|
|
def persistentSequenceNr: Long
|
|
|
|
|
def deliverySequenceNr: Long
|
|
|
|
|
def channel: ActorRef
|
2013-09-14 14:19:18 +02:00
|
|
|
|
|
|
|
|
/**
|
2014-01-17 06:58:25 +01:00
|
|
|
* INTERNAL API.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2014-01-17 06:58:25 +01:00
|
|
|
private[persistence] def update(deliverySequenceNr: Long = deliverySequenceNr, channel: ActorRef = channel): Delivered
|
|
|
|
|
}
|
2013-09-14 14:19:18 +02:00
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
/**
|
|
|
|
|
* Plugin API.
|
|
|
|
|
*/
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class DeliveredByChannel(
|
2014-01-17 06:58:25 +01:00
|
|
|
processorId: String,
|
|
|
|
|
channelId: String,
|
|
|
|
|
persistentSequenceNr: Long,
|
|
|
|
|
deliverySequenceNr: Long = 0L,
|
|
|
|
|
channel: ActorRef = null) extends Delivered with PersistentConfirmation {
|
|
|
|
|
|
|
|
|
|
def sequenceNr: Long = persistentSequenceNr
|
|
|
|
|
def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByChannel =
|
|
|
|
|
copy(deliverySequenceNr = deliverySequenceNr, channel = channel)
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2014-01-17 06:58:25 +01:00
|
|
|
* INTERNAL API.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2014-01-17 06:58:25 +01:00
|
|
|
private[persistence] class DeliveredByChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor {
|
|
|
|
|
private val publish = settings.internal.publishConfirmations
|
|
|
|
|
private val batchMax = settings.journal.maxConfirmationBatchSize
|
|
|
|
|
|
|
|
|
|
private var batching = false
|
|
|
|
|
private var batch = Vector.empty[DeliveredByChannel]
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case WriteConfirmationsSuccess(confirmations) ⇒
|
|
|
|
|
if (batch.isEmpty) batching = false else journalBatch()
|
|
|
|
|
confirmations.foreach { c ⇒
|
|
|
|
|
val dbc = c.asInstanceOf[DeliveredByChannel]
|
|
|
|
|
if (dbc.channel != null) dbc.channel ! c
|
|
|
|
|
if (publish) context.system.eventStream.publish(c)
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
2014-01-17 06:58:25 +01:00
|
|
|
case WriteConfirmationsFailure(_) ⇒
|
|
|
|
|
if (batch.isEmpty) batching = false else journalBatch()
|
|
|
|
|
case d: DeliveredByChannel ⇒
|
|
|
|
|
addToBatch(d)
|
|
|
|
|
if (!batching || maxBatchSizeReached) journalBatch()
|
|
|
|
|
case m ⇒ journal forward m
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
def addToBatch(pc: DeliveredByChannel): Unit =
|
|
|
|
|
batch = batch :+ pc
|
|
|
|
|
|
|
|
|
|
def maxBatchSizeReached: Boolean =
|
|
|
|
|
batch.length >= batchMax
|
|
|
|
|
|
|
|
|
|
def journalBatch(): Unit = {
|
|
|
|
|
journal ! WriteConfirmations(batch, self)
|
|
|
|
|
batch = Vector.empty
|
|
|
|
|
batching = true
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
2014-01-17 06:58:25 +01:00
|
|
|
}
|
2013-09-14 14:19:18 +02:00
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
/**
|
|
|
|
|
* Notification message to inform channel listeners about messages that have reached the maximum
|
|
|
|
|
* number of redeliveries.
|
|
|
|
|
*/
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class RedeliverFailure(messages: immutable.Seq[ConfirmablePersistent]) {
|
2014-01-17 06:58:25 +01:00
|
|
|
/**
|
|
|
|
|
* Java API.
|
|
|
|
|
*/
|
|
|
|
|
def getMessages: JIterable[ConfirmablePersistent] = messages.asJava
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-12-06 12:48:44 +01:00
|
|
|
* Reliably deliver messages contained in [[Deliver]] requests to their destinations. Unconfirmed
|
|
|
|
|
* messages are redelivered according to the parameters in [[ChannelSettings]].
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2014-01-17 06:58:25 +01:00
|
|
|
private class ReliableDelivery(redeliverSettings: ChannelSettings) extends Actor {
|
|
|
|
|
import redeliverSettings._
|
2013-12-06 12:48:44 +01:00
|
|
|
import ReliableDelivery._
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
private val redelivery = context.actorOf(Props(classOf[Redelivery], redeliverSettings))
|
|
|
|
|
private var deliveryAttempts: DeliveryAttempts = immutable.SortedMap.empty
|
|
|
|
|
private var deliverySequenceNr: Long = 0L
|
2013-12-06 12:48:44 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
2014-01-17 06:58:25 +01:00
|
|
|
case d @ Deliver(persistent: ConfirmablePersistentImpl, destination) ⇒
|
|
|
|
|
val dsnr = nextDeliverySequenceNr()
|
2013-12-06 12:48:44 +01:00
|
|
|
val psnr = persistent.sequenceNr
|
2014-01-17 06:58:25 +01:00
|
|
|
val confirm = persistent.confirmMessage.update(deliverySequenceNr = dsnr)
|
2013-12-06 12:48:44 +01:00
|
|
|
val updated = persistent.update(confirmMessage = confirm, sequenceNr = if (psnr == 0) dsnr else psnr)
|
2014-06-20 23:05:51 +02:00
|
|
|
context.actorSelection(destination).tell(updated, sender())
|
|
|
|
|
deliveryAttempts += (dsnr -> DeliveryAttempt(updated, destination, sender()))
|
2014-01-17 06:58:25 +01:00
|
|
|
case d: Delivered ⇒
|
|
|
|
|
deliveryAttempts -= d.deliverySequenceNr
|
|
|
|
|
redelivery forward d
|
2013-12-06 12:48:44 +01:00
|
|
|
case Redeliver ⇒
|
|
|
|
|
val limit = System.nanoTime - redeliverInterval.toNanos
|
2014-01-17 06:58:25 +01:00
|
|
|
val (older, younger) = deliveryAttempts.span { case (_, a) ⇒ a.timestamp < limit }
|
2013-12-06 12:48:44 +01:00
|
|
|
redelivery ! Redeliver(older, redeliverMax)
|
2014-01-17 06:58:25 +01:00
|
|
|
deliveryAttempts = younger
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
private def nextDeliverySequenceNr(): Long = {
|
|
|
|
|
deliverySequenceNr += 1
|
|
|
|
|
deliverySequenceNr
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
2013-12-06 12:48:44 +01:00
|
|
|
private object ReliableDelivery {
|
2014-01-17 06:58:25 +01:00
|
|
|
type DeliveryAttempts = immutable.SortedMap[Long, DeliveryAttempt]
|
|
|
|
|
type FailedAttempts = Vector[ConfirmablePersistentImpl]
|
2013-12-06 12:48:44 +01:00
|
|
|
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class DeliveryAttempt(persistent: ConfirmablePersistentImpl, destination: ActorPath, sender: ActorRef, timestamp: Long = System.nanoTime) {
|
2013-12-06 12:48:44 +01:00
|
|
|
def incrementRedeliveryCount =
|
|
|
|
|
copy(persistent.update(redeliveries = persistent.redeliveries + 1))
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class Redeliver(attempts: DeliveryAttempts, redeliveryMax: Int)
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
2013-11-07 10:45:02 +01:00
|
|
|
/**
|
2013-12-06 12:48:44 +01:00
|
|
|
* Redelivery process used by [[ReliableDelivery]].
|
2013-11-07 10:45:02 +01:00
|
|
|
*/
|
2014-01-17 06:58:25 +01:00
|
|
|
private class Redelivery(redeliverSettings: ChannelSettings) extends Actor {
|
2013-12-06 12:48:44 +01:00
|
|
|
import context.dispatcher
|
2014-01-17 06:58:25 +01:00
|
|
|
import redeliverSettings._
|
2013-12-06 12:48:44 +01:00
|
|
|
import ReliableDelivery._
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
private var redeliveryAttempts: DeliveryAttempts = immutable.SortedMap.empty
|
|
|
|
|
private var redeliverySchedule: Cancellable = _
|
2013-12-06 12:48:44 +01:00
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case Redeliver(as, max) ⇒
|
2014-01-17 06:58:25 +01:00
|
|
|
val (attempts, failed) = (redeliveryAttempts ++ as).foldLeft[(DeliveryAttempts, FailedAttempts)]((immutable.SortedMap.empty, Vector.empty)) {
|
|
|
|
|
case ((attempts, failed), (k, attempt)) ⇒
|
|
|
|
|
val persistent = attempt.persistent
|
|
|
|
|
if (persistent.redeliveries >= redeliverMax) {
|
|
|
|
|
(attempts, failed :+ persistent)
|
|
|
|
|
} else {
|
|
|
|
|
val updated = attempt.incrementRedeliveryCount
|
|
|
|
|
context.actorSelection(updated.destination).tell(updated.persistent, updated.sender)
|
|
|
|
|
(attempts.updated(k, updated), failed)
|
|
|
|
|
|
|
|
|
|
}
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
2014-01-17 06:58:25 +01:00
|
|
|
redeliveryAttempts = attempts
|
2013-12-06 12:48:44 +01:00
|
|
|
scheduleRedelivery()
|
2014-01-17 06:58:25 +01:00
|
|
|
failed.headOption.foreach(_.confirmMessage.channel ! RedeliverFailure(failed))
|
|
|
|
|
case c: Delivered ⇒
|
|
|
|
|
redeliveryAttempts -= c.deliverySequenceNr
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit =
|
|
|
|
|
scheduleRedelivery()
|
|
|
|
|
|
|
|
|
|
override def postStop(): Unit =
|
2014-01-17 06:58:25 +01:00
|
|
|
redeliverySchedule.cancel()
|
2013-12-06 12:48:44 +01:00
|
|
|
|
|
|
|
|
private def scheduleRedelivery(): Unit =
|
2014-01-17 06:58:25 +01:00
|
|
|
redeliverySchedule = context.system.scheduler.scheduleOnce(redeliverInterval, context.parent, Redeliver)
|
2013-11-07 10:45:02 +01:00
|
|
|
}
|
2013-12-06 12:48:44 +01:00
|
|
|
|