pekko/akka-persistence/src/main/scala/akka/persistence/Channel.scala

440 lines
16 KiB
Scala
Raw Normal View History

akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.AkkaException
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
import akka.actor._
import akka.persistence.serialization.Message
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
/**
* A channel is used by [[Processor]]s for sending [[Persistent]] messages to destinations. The main
* responsibility of a channel is to prevent redundant delivery of replayed messages to destinations
* when a processor is recovered.
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
*
* A channel can be instructed to deliver a persistent message to a `destination` via the [[Deliver]]
* command.
*
* {{{
* class ForwardExample extends Processor {
* val destination = context.actorOf(Props[MyDestination])
* val channel = context.actorOf(Channel.props(), "myChannel")
*
* def receive = {
* case m @ Persistent(payload, _) => {
* // forward modified message to destination
* channel forward Deliver(m.withPayload(s"fw: ${payload}"), destination)
* }
* }
* }
* }}}
*
* To reply to the sender of a persistent message, the `sender` reference should be used as channel
* destination.
*
* {{{
* class ReplyExample extends Processor {
* val channel = context.actorOf(Channel.props(), "myChannel")
*
* def receive = {
* case m @ Persistent(payload, _) => {
* // reply modified message to sender
* channel ! Deliver(m.withPayload(s"re: ${payload}"), sender)
* }
* }
* }
* }}}
*
* Redundant delivery of messages to destinations is only prevented if the receipt of these messages
* is explicitly confirmed. Therefore, persistent messages that are delivered via a channel are of type
* [[ConfirmablePersistent]]. Their receipt can be confirmed by a destination by calling the `confirm()`
* method on these messages.
*
* {{{
* class MyDestination extends Actor {
* def receive = {
* case cp @ ConfirmablePersistent(payload, sequenceNr) => cp.confirm()
* }
* }
* }}}
*
* A channel will only re-deliver messages if the sending processor is recovered and delivery of these
* messages has not been confirmed yet. Hence, a channel can be used to avoid message loss in case of
* sender JVM crashes, for example. A channel, however, does not attempt any re-deliveries should a
* destination be unavailable. Re-delivery to destinations (in case of network failures or destination
* JVM crashes) is an application-level concern and can be done by using a reliable proxy, for example.
*
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
* @see [[Deliver]]
*/
sealed class Channel private[akka] (_channelId: Option[String]) extends Actor with Stash {
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
private val extension = Persistence(context.system)
private val id = _channelId match {
case Some(cid) cid
case None extension.channelId(self)
}
import ResolvedDelivery._
private val delivering: Actor.Receive = {
case Deliver(persistent: PersistentRepr, destination, resolve) {
if (!persistent.confirms.contains(id)) {
val prepared = prepareDelivery(persistent)
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
resolve match {
case Resolve.Sender if !prepared.resolved {
context.actorOf(Props(classOf[ResolvedSenderDelivery], prepared, destination, sender)) ! DeliverResolved
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
context.become(buffering, false)
}
case Resolve.Destination if !prepared.resolved {
context.actorOf(Props(classOf[ResolvedDestinationDelivery], prepared, destination, sender)) ! DeliverResolved
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
context.become(buffering, false)
}
case _ destination tell (prepared, sender)
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
}
}
unstash()
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
}
}
private val buffering: Actor.Receive = {
case DeliveredResolved | DeliveredUnresolved { context.unbecome(); unstash() }
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
case _: Deliver stash()
}
def receive = delivering
private[akka] def prepareDelivery(persistent: PersistentRepr): PersistentRepr = {
ConfirmablePersistentImpl(
persistent = persistent,
confirmTarget = extension.journalFor(persistent.processorId),
confirmMessage = Confirm(persistent.processorId, persistent.sequenceNr, id))
}
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
}
object Channel {
/**
* Returns a channel configuration object for creating a [[Channel]] with a
* generated id.
*/
def props(): Props = Props(classOf[Channel], None)
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
/**
* Returns a channel configuration object for creating a [[Channel]] with the
* specified id.
*
* @param channelId channel id.
*/
def props(channelId: String): Props = Props(classOf[Channel], Some(channelId))
}
/**
* A [[PersistentChannel]] implements the same functionality as a [[Channel]] but additionally
* persists messages before they are delivered. Therefore, the main use case of a persistent
* channel is standalone usage i.e. independent of a sending [[Processor]]. Messages that have
* been persisted by a persistent channel are deleted again when destinations confirm the receipt
* of these messages.
*
* Using a persistent channel in combination with a [[Processor]] can make sense if destinations
* are unavailable for a long time and an application doesn't want to buffer all messages in
* memory (but write them to a journal instead). In this case, delivery can be disabled with
* [[DisableDelivery]] (to stop delivery and persist-only) and re-enabled with [[EnableDelivery]].
*
* A persistent channel can also be configured to reply whether persisting a message was successful
* or not (see `PersistentChannel.props` methods). If enabled, the sender will receive the persisted
* message as reply (i.e. a [[Persistent]] message), otherwise a [[PersistenceFailure]] message.
*
* A persistent channel will only re-deliver un-confirmed, stored messages if it is started or re-
* enabled with [[EnableDelivery]]. Hence, a persistent channel can be used to avoid message loss
* in case of sender JVM crashes, for example. A channel, however, does not attempt any re-deliveries
* should a destination be unavailable. Re-delivery to destinations (in case of network failures or
* destination JVM crashes) is an application-level concern and can be done by using a reliable proxy,
* for example.
*/
final class PersistentChannel private[akka] (_channelId: Option[String], persistentReply: Boolean) extends EventsourcedProcessor {
override val processorId = _channelId.getOrElse(super.processorId)
private val journal = Persistence(context.system).journalFor(processorId)
private val channel = context.actorOf(Props(classOf[NoPrepChannel], processorId))
private var deliveryEnabled = true
def receiveReplay: Receive = {
case Deliver(persistent: PersistentRepr, destination, resolve) deliver(prepareDelivery(persistent), destination, resolve)
}
def receiveCommand: Receive = {
case d @ Deliver(persistent: PersistentRepr, destination, resolve) {
if (!persistent.confirms.contains(processorId)) {
persist(d) { _
val prepared = prepareDelivery(persistent)
if (persistent.processorId != PersistentRepr.Undefined)
journal ! Confirm(persistent.processorId, persistent.sequenceNr, processorId)
if (persistentReply)
sender ! prepared
if (deliveryEnabled)
deliver(prepared, destination, resolve)
}
}
}
case c: Confirm deleteMessage(c.sequenceNr, true)
case DisableDelivery deliveryEnabled = false
case EnableDelivery if (!deliveryEnabled) throw new ChannelRestartRequiredException
case p: PersistenceFailure if (persistentReply) sender ! p
}
private def prepareDelivery(persistent: PersistentRepr): PersistentRepr = currentPersistentMessage.map { current
val sequenceNr = if (persistent.sequenceNr == 0L) current.sequenceNr else persistent.sequenceNr
val resolved = persistent.resolved && current.asInstanceOf[PersistentRepr].resolved
persistent.update(sequenceNr = sequenceNr, resolved = resolved)
} getOrElse (persistent)
private def deliver(persistent: PersistentRepr, destination: ActorRef, resolve: Resolve.ResolveStrategy) = currentPersistentMessage.foreach { current
channel forward Deliver(persistent = ConfirmablePersistentImpl(persistent,
confirmTarget = self,
confirmMessage = Confirm(processorId, current.sequenceNr, PersistentRepr.Undefined)), destination, resolve)
}
}
object PersistentChannel {
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with a
* generated id. The sender will not receive persistence completion replies.
*/
def props(): Props = props(persistentReply = false)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with a
* generated id.
*
* @param persistentReply if `true` the sender will receive the successfully stored
* [[Persistent]] message that has been submitted with a
* [[Deliver]] request, or a [[PersistenceFailure]] message
* in case of a persistence failure.
*/
def props(persistentReply: Boolean): Props = Props(classOf[PersistentChannel], None, persistentReply)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with the
* specified id. The sender will not receive persistence completion replies.
*
* @param channelId channel id.
*/
def props(channelId: String): Props = props(channelId, persistentReply = false)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with the
* specified id.
*
* @param channelId channel id.
* @param persistentReply if `true` the sender will receive the successfully stored
* [[Persistent]] message that has been submitted with a
* [[Deliver]] request, or a [[PersistenceFailure]] message
* in case of a persistence failure.
*/
def props(channelId: String, persistentReply: Boolean): Props = Props(classOf[PersistentChannel], Some(channelId), persistentReply)
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
}
/**
* Instructs a [[PersistentChannel]] to disable the delivery of [[Persistent]] messages to their destination.
* The persistent channel, however, continues to persist messages (for later delivery).
*
* @see [[EnableDelivery]]
*/
@SerialVersionUID(1L)
case object DisableDelivery {
/**
* Java API.
*/
def getInstance = this
}
/**
* Instructs a [[PersistentChannel]] to re-enable the delivery of [[Persistent]] messages to their destination.
* This will first deliver all messages that have been stored by a persistent channel for which no confirmation
* is available yet. New [[Deliver]] requests are processed after all stored messages have been delivered. This
* request only has an effect if a persistent channel has previously been disabled with [[DisableDelivery]].
*
* @see [[DisableDelivery]]
*/
@SerialVersionUID(1L)
case object EnableDelivery {
/**
* Java API.
*/
def getInstance = this
}
/**
* Thrown by a persistent channel when [[EnableDelivery]] has been requested and delivery has been previously
* disabled for that channel.
*/
@SerialVersionUID(1L)
class ChannelRestartRequiredException extends AkkaException("channel restart required for enabling delivery")
/**
* Instructs a [[Channel]] or [[PersistentChannel]] to deliver `persistent` message to
* destination `destination`. The `resolve` parameter can be:
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
*
* - `Resolve.Destination`: will resolve a new destination reference from the specified
* `destination`s path. The `persistent` message will be sent to the newly resolved
* destination.
* - `Resolve.Sender`: will resolve a new sender reference from this `Deliver` message's
* `sender` path. The `persistent` message will be sent to the specified `destination`
* using the newly resolved sender.
* - `Resolve.Off`: will not do any resolution (default).
*
* Resolving an actor reference means first obtaining an `ActorSelection` from the path of
* the reference to be resolved and then obtaining a new actor reference via an `Identify`
* - `ActorIdentity` conversation. Actor reference resolution does not change the original
* order of messages.
*
* Resolving actor references may become necessary when using the stored sender references
* of replayed messages. A stored sender reference may become invalid (for example, it may
* reference a previous sender incarnation, after a JVM restart). Depending on how a processor
* uses sender references, two resolution strategies are relevant.
*
* - `Resolve.Sender` when a processor forwards a replayed message to a destination.
*
* {{{
* channel forward Deliver(message, destination, Resolve.Sender)
* }}}
*
* - `Resolve.Destination` when a processor replies to the sender of a replayed message. In
* this case the sender is used as channel destination.
*
* {{{
* channel ! Deliver(message, sender, Resolve.Destination)
* }}}
*
* A destination or sender reference will only be resolved by a channel if
*
* - the `resolve` parameter is set to `Resolve.Destination` or `Resolve.Channel`
* - the message is replayed
* - the message is not retained by the channel and
* - there was no previous successful resolve action for that message
*
* @param persistent persistent message.
* @param destination persistent message destination.
* @param resolve resolve strategy.
*/
@SerialVersionUID(1L)
case class Deliver(persistent: Persistent, destination: ActorRef, resolve: Resolve.ResolveStrategy = Resolve.Off) extends Message
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
object Deliver {
/**
* Java API.
*/
def create(persistent: Persistent, destination: ActorRef) = Deliver(persistent, destination)
/**
* Java API.
*/
def create(persistent: Persistent, destination: ActorRef, resolve: Resolve.ResolveStrategy) = Deliver(persistent, destination, resolve)
}
/**
* Actor reference resolution strategy.
*
* @see [[Deliver]]
*/
object Resolve {
sealed abstract class ResolveStrategy
/**
* No resolution.
*/
@SerialVersionUID(1L)
case object Off extends ResolveStrategy
/**
* [[Channel]] should resolve the `sender` of a [[Deliver]] message.
*/
@SerialVersionUID(1L)
case object Sender extends ResolveStrategy
/**
* [[Channel]] should resolve the `destination` of a [[Deliver]] message.
*/
@SerialVersionUID(1L)
case object Destination extends ResolveStrategy
/**
* Java API.
*/
def off() = Off
/**
* Java API.
*/
def sender() = Sender
/**
* Java API.
*/
def destination() = Destination
}
/**
* Resolved delivery support.
*/
private trait ResolvedDelivery extends Actor {
import scala.concurrent.duration._
import scala.language.postfixOps
import ResolvedDelivery._
context.setReceiveTimeout(5 seconds) // TODO: make configurable
def path: ActorPath
def onResolveSuccess(ref: ActorRef): Unit
def onResolveFailure(): Unit
def receive = {
case DeliverResolved context.actorSelection(path) ! Identify(1)
case ActorIdentity(1, Some(ref)) { onResolveSuccess(ref); shutdown(DeliveredResolved) }
case ActorIdentity(1, None) { onResolveFailure(); shutdown(DeliveredUnresolved) }
case ReceiveTimeout { onResolveFailure(); shutdown(DeliveredUnresolved) }
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
}
def shutdown(message: Any) {
context.parent ! message
context.stop(self)
}
}
private object ResolvedDelivery {
case object DeliverResolved
case object DeliveredResolved
case object DeliveredUnresolved
}
/**
* Resolves `destination` before sending `persistent` message to the resolved destination using
* the specified sender (`sdr`) as message sender.
*/
private class ResolvedDestinationDelivery(persistent: PersistentRepr, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery {
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
val path = destination.path
def onResolveSuccess(ref: ActorRef) = ref tell (persistent.update(resolved = true), sdr)
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
def onResolveFailure() = destination tell (persistent, sdr)
}
/**
* Resolves `sdr` before sending `persistent` message to specified `destination` using
* the resolved sender as message sender.
*/
private class ResolvedSenderDelivery(persistent: PersistentRepr, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery {
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
val path = sdr.path
def onResolveSuccess(ref: ActorRef) = destination tell (persistent.update(resolved = true), ref)
akka-persistence prototype The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
2013-09-14 14:19:18 +02:00
def onResolveFailure() = destination tell (persistent, sdr)
}
/**
* [[Channel]] specialization used by [[PersistentChannel]] to deliver stored messages.
*/
private class NoPrepChannel(channelId: String) extends Channel(Some(channelId)) {
override private[akka] def prepareDelivery(persistent: PersistentRepr) = persistent
}