2013-09-14 14:19:18 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.persistence
|
|
|
|
|
|
|
|
|
|
import akka.actor._
|
|
|
|
|
import akka.dispatch._
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
|
|
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* import akka.persistence.{ Persistent, Processor }
|
|
|
|
|
*
|
|
|
|
|
* class MyProcessor extends Processor {
|
|
|
|
|
* def receive = {
|
|
|
|
|
* case Persistent(payload, sequenceNr) => // message has been written to journal
|
|
|
|
|
* case other => // message has not been written to journal
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
*
|
|
|
|
|
* val processor = actorOf(Props[MyProcessor], name = "myProcessor")
|
|
|
|
|
*
|
|
|
|
|
* processor ! Persistent("foo")
|
|
|
|
|
* processor ! "bar"
|
|
|
|
|
* }}}
|
|
|
|
|
*
|
|
|
|
|
*
|
|
|
|
|
* During start and restart, persistent messages are replayed to a processor so that it can recover internal
|
|
|
|
|
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed
|
|
|
|
|
* messages, hence applications don't need to wait for a processor to complete its recovery.
|
|
|
|
|
*
|
2013-09-26 09:14:43 +02:00
|
|
|
* Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life
|
|
|
|
|
* cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by
|
|
|
|
|
* sending it a [[Recover]] message.
|
2013-09-14 14:19:18 +02:00
|
|
|
*
|
|
|
|
|
* [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
|
2013-09-26 09:14:43 +02:00
|
|
|
* starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message
|
2013-09-14 14:19:18 +02:00
|
|
|
*
|
|
|
|
|
* During recovery, a processor internally buffers new messages until recovery completes, so that new messages
|
|
|
|
|
* do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the
|
|
|
|
|
* ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the
|
|
|
|
|
* ''user stash'' for stashing/unstashing both persistent and transient messages.
|
|
|
|
|
*
|
2013-10-08 11:46:02 +02:00
|
|
|
* Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved
|
|
|
|
|
* snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any,
|
|
|
|
|
* that are younger than the snapshot. Default is to offer the latest saved snapshot.
|
|
|
|
|
*
|
2013-09-14 14:19:18 +02:00
|
|
|
* @see [[UntypedProcessor]]
|
2013-10-08 11:46:02 +02:00
|
|
|
* @see [[Recover]]
|
2013-10-27 08:01:14 +01:00
|
|
|
* @see [[PersistentBatch]]
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2013-11-07 10:45:02 +01:00
|
|
|
trait Processor extends Actor with Stash with StashFactory {
|
2013-10-08 11:46:02 +02:00
|
|
|
import JournalProtocol._
|
|
|
|
|
import SnapshotProtocol._
|
2013-09-14 14:19:18 +02:00
|
|
|
|
|
|
|
|
private val extension = Persistence(context.system)
|
|
|
|
|
private val _processorId = extension.processorId(self)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Processor state.
|
|
|
|
|
*/
|
|
|
|
|
private trait State {
|
|
|
|
|
/**
|
|
|
|
|
* State-specific message handler.
|
|
|
|
|
*/
|
|
|
|
|
def aroundReceive(receive: Actor.Receive, message: Any): Unit
|
|
|
|
|
|
|
|
|
|
protected def process(receive: Actor.Receive, message: Any) =
|
|
|
|
|
receive.applyOrElse(message, unhandled)
|
|
|
|
|
|
2013-10-15 09:01:07 +02:00
|
|
|
protected def processPersistent(receive: Actor.Receive, persistent: Persistent) =
|
|
|
|
|
withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled))
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Initial state, waits for `Recover` request, then changes to `recoveryStarted`.
|
|
|
|
|
*/
|
|
|
|
|
private val recoveryPending = new State {
|
|
|
|
|
override def toString: String = "recovery pending"
|
|
|
|
|
|
|
|
|
|
def aroundReceive(receive: Actor.Receive, message: Any): Unit = message match {
|
2013-09-26 09:14:43 +02:00
|
|
|
case Recover(fromSnap, toSnr) ⇒ {
|
2013-09-14 14:19:18 +02:00
|
|
|
_currentState = recoveryStarted
|
2013-09-26 09:14:43 +02:00
|
|
|
snapshotStore ! LoadSnapshot(processorId, fromSnap, toSnr)
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
2013-10-15 09:01:07 +02:00
|
|
|
case _ ⇒ processorStash.stash()
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-09-26 09:14:43 +02:00
|
|
|
* Processes a loaded snapshot and replayed messages, if any. If processing of the loaded
|
|
|
|
|
* snapshot fails, the exception is thrown immediately. If processing of a replayed message
|
|
|
|
|
* fails, the exception is caught and stored for being thrown later and state is changed to
|
|
|
|
|
* `recoveryFailed`.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
|
|
|
|
private val recoveryStarted = new State {
|
|
|
|
|
override def toString: String = "recovery started"
|
|
|
|
|
|
|
|
|
|
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
|
2013-10-08 11:46:02 +02:00
|
|
|
case LoadSnapshotResult(sso, toSnr) ⇒ sso match {
|
|
|
|
|
case Some(SelectedSnapshot(metadata, snapshot)) ⇒ {
|
|
|
|
|
process(receive, SnapshotOffer(metadata, snapshot))
|
|
|
|
|
journal ! Replay(metadata.sequenceNr + 1L, toSnr, processorId, self)
|
2013-09-26 09:14:43 +02:00
|
|
|
} case None ⇒ {
|
2013-10-08 11:46:02 +02:00
|
|
|
journal ! Replay(1L, toSnr, processorId, self)
|
2013-09-26 09:14:43 +02:00
|
|
|
}
|
|
|
|
|
}
|
2013-10-08 11:46:02 +02:00
|
|
|
case ReplaySuccess(maxSnr) ⇒ {
|
2013-09-26 09:14:43 +02:00
|
|
|
_currentState = recoverySucceeded
|
|
|
|
|
_sequenceNr = maxSnr
|
2013-10-15 09:01:07 +02:00
|
|
|
processorStash.unstashAll()
|
2013-09-26 09:14:43 +02:00
|
|
|
}
|
2013-10-08 11:46:02 +02:00
|
|
|
case ReplayFailure(cause) ⇒ {
|
2013-10-15 09:01:07 +02:00
|
|
|
val notification = RecoveryFailure(cause)
|
|
|
|
|
if (receive.isDefinedAt(notification)) process(receive, notification)
|
|
|
|
|
else {
|
|
|
|
|
val errorMsg = s"Replay failure by journal (processor id = [${processorId}])"
|
|
|
|
|
throw new RecoveryFailureException(errorMsg, cause)
|
|
|
|
|
}
|
2013-10-08 11:46:02 +02:00
|
|
|
}
|
2013-09-14 14:19:18 +02:00
|
|
|
case Replayed(p) ⇒ try { processPersistent(receive, p) } catch {
|
|
|
|
|
case t: Throwable ⇒ {
|
|
|
|
|
_currentState = recoveryFailed // delay throwing exception to prepareRestart
|
2013-10-08 11:46:02 +02:00
|
|
|
_recoveryFailureCause = t
|
2013-09-14 14:19:18 +02:00
|
|
|
_recoveryFailureMessage = currentEnvelope
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-09-26 09:14:43 +02:00
|
|
|
case r: Recover ⇒ // ignore
|
2013-10-15 09:01:07 +02:00
|
|
|
case _ ⇒ processorStash.stash()
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Journals and processes new messages, both persistent and transient.
|
|
|
|
|
*/
|
|
|
|
|
private val recoverySucceeded = new State {
|
|
|
|
|
override def toString: String = "recovery finished"
|
|
|
|
|
|
|
|
|
|
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
|
2013-10-09 13:11:53 +02:00
|
|
|
case r: Recover ⇒ // ignore
|
|
|
|
|
case Replayed(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash
|
|
|
|
|
case WriteSuccess(p) ⇒ processPersistent(receive, p)
|
|
|
|
|
case WriteFailure(p, cause) ⇒ {
|
|
|
|
|
val notification = PersistenceFailure(p.payload, p.sequenceNr, cause)
|
|
|
|
|
if (receive.isDefinedAt(notification)) process(receive, notification)
|
|
|
|
|
else {
|
|
|
|
|
val errorMsg = "Processor killed after persistence failure " +
|
|
|
|
|
s"(processor id = [${processorId}], sequence nr = [${p.sequenceNr}], payload class = [${p.payload.getClass.getName}]). " +
|
|
|
|
|
"To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages."
|
|
|
|
|
throw new ActorKilledException(errorMsg)
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-10-27 08:01:14 +01:00
|
|
|
case LoopSuccess(m) ⇒ process(receive, m)
|
2013-11-07 10:45:02 +01:00
|
|
|
case p: PersistentRepr ⇒ journal forward Write(p.update(processorId = processorId, sequenceNr = nextSequenceNr()), self)
|
|
|
|
|
case pb: PersistentBatch ⇒ journal forward WriteBatch(pb.persistentReprList.map(_.update(processorId = processorId, sequenceNr = nextSequenceNr())), self)
|
2013-10-27 08:01:14 +01:00
|
|
|
case m ⇒ journal forward Loop(m, self)
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Consumes remaining replayed messages and then changes to `prepareRestart`. The
|
|
|
|
|
* message that caused the exception during replay, is re-added to the mailbox and
|
|
|
|
|
* re-received in `prepareRestart`.
|
|
|
|
|
*/
|
|
|
|
|
private val recoveryFailed = new State {
|
|
|
|
|
override def toString: String = "recovery failed"
|
|
|
|
|
|
|
|
|
|
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
|
2013-10-09 13:11:53 +02:00
|
|
|
case ReplaySuccess(_) | ReplayFailure(_) ⇒ {
|
2013-09-14 14:19:18 +02:00
|
|
|
_currentState = prepareRestart
|
|
|
|
|
mailbox.enqueueFirst(self, _recoveryFailureMessage)
|
|
|
|
|
}
|
|
|
|
|
case Replayed(p) ⇒ updateLastSequenceNr(p)
|
2013-09-26 09:14:43 +02:00
|
|
|
case r: Recover ⇒ // ignore
|
2013-10-15 09:01:07 +02:00
|
|
|
case _ ⇒ processorStash.stash()
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Re-receives the replayed message that causes an exception during replay and throws
|
|
|
|
|
* that exception.
|
|
|
|
|
*/
|
|
|
|
|
private val prepareRestart = new State {
|
|
|
|
|
override def toString: String = "prepare restart"
|
|
|
|
|
|
|
|
|
|
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
|
2013-10-08 11:46:02 +02:00
|
|
|
case Replayed(_) ⇒ throw _recoveryFailureCause
|
2013-09-14 14:19:18 +02:00
|
|
|
case _ ⇒ // ignore
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private var _sequenceNr: Long = 0L
|
|
|
|
|
private var _lastSequenceNr: Long = 0L
|
|
|
|
|
|
|
|
|
|
private var _currentPersistent: Persistent = _
|
|
|
|
|
private var _currentState: State = recoveryPending
|
|
|
|
|
|
2013-10-08 11:46:02 +02:00
|
|
|
private var _recoveryFailureCause: Throwable = _
|
2013-09-14 14:19:18 +02:00
|
|
|
private var _recoveryFailureMessage: Envelope = _
|
|
|
|
|
|
2013-09-26 09:14:43 +02:00
|
|
|
private lazy val journal = extension.journalFor(processorId)
|
|
|
|
|
private lazy val snapshotStore = extension.snapshotStoreFor(processorId)
|
2013-09-14 14:19:18 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Processor id. Defaults to this processor's path and can be overridden.
|
|
|
|
|
*/
|
|
|
|
|
def processorId: String = _processorId
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Highest received sequence number so far or `0L` if this processor hasn't received
|
|
|
|
|
* a persistent message yet. Usually equal to the sequence number of `currentPersistentMessage`
|
|
|
|
|
* (unless a processor implementation is about to re-order persistent messages using
|
|
|
|
|
* `stash()` and `unstash()`).
|
|
|
|
|
*/
|
|
|
|
|
def lastSequenceNr: Long = _lastSequenceNr
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns `true` if this processor is currently recovering.
|
|
|
|
|
*/
|
|
|
|
|
def recoveryRunning: Boolean =
|
|
|
|
|
_currentState == recoveryStarted ||
|
|
|
|
|
_currentState == prepareRestart
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns `true` if this processor has successfully finished recovery.
|
|
|
|
|
*/
|
|
|
|
|
def recoveryFinished: Boolean =
|
|
|
|
|
_currentState == recoverySucceeded
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the current persistent message if there is one.
|
|
|
|
|
*/
|
|
|
|
|
implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Marks the `persistent` message as deleted. A message marked as deleted is not replayed during
|
|
|
|
|
* recovery. This method is usually called inside `preRestartProcessor` when a persistent message
|
|
|
|
|
* caused an exception. Processors that want to re-receive that persistent message during recovery
|
|
|
|
|
* should not call this method.
|
2013-11-07 10:45:02 +01:00
|
|
|
*
|
|
|
|
|
* @param persistent persistent message to be marked as deleted.
|
|
|
|
|
* @throws IllegalArgumentException if `persistent` message has not been persisted by this
|
|
|
|
|
* processor.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2013-09-26 09:14:43 +02:00
|
|
|
def deleteMessage(persistent: Persistent): Unit = {
|
2013-11-07 10:45:02 +01:00
|
|
|
deleteMessage(persistent, false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Deletes a `persistent` message. If `physical` is set to `false` (default), the persistent
|
|
|
|
|
* message is marked as deleted in the journal, otherwise it is physically deleted from the
|
|
|
|
|
* journal. A deleted message is not replayed during recovery. This method is usually called
|
|
|
|
|
* inside `preRestartProcessor` when a persistent message caused an exception. Processors that
|
|
|
|
|
* want to re-receive that persistent message during recovery should not call this method.
|
|
|
|
|
*
|
|
|
|
|
* @param persistent persistent message to be deleted.
|
|
|
|
|
* @param physical if `false` (default), the message is marked as deleted, otherwise it is
|
|
|
|
|
* physically deleted.
|
|
|
|
|
* @throws IllegalArgumentException if `persistent` message has not been persisted by this
|
|
|
|
|
* processor.
|
|
|
|
|
*/
|
|
|
|
|
def deleteMessage(persistent: Persistent, physical: Boolean): Unit = {
|
|
|
|
|
val impl = persistent.asInstanceOf[PersistentRepr]
|
|
|
|
|
if (impl.processorId != processorId)
|
|
|
|
|
throw new IllegalArgumentException(
|
|
|
|
|
s"persistent message to be deleted (processor id = [${impl.processorId}], sequence number = [${impl.sequenceNr}]) " +
|
|
|
|
|
s"has not been persisted by this processor (processor id = [${processorId}])")
|
|
|
|
|
else deleteMessage(impl.sequenceNr, physical)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Deletes a persistent message identified by `sequenceNr`. If `physical` is set to `false`,
|
|
|
|
|
* the persistent message is marked as deleted in the journal, otherwise it is physically
|
|
|
|
|
* deleted from the journal. A deleted message is not replayed during recovery. This method
|
|
|
|
|
* is usually called inside `preRestartProcessor` when a persistent message caused an exception.
|
|
|
|
|
* Processors that want to re-receive that persistent message during recovery should not call
|
|
|
|
|
* this method.
|
|
|
|
|
*
|
|
|
|
|
* @param sequenceNr sequence number of the persistent message to be deleted.
|
|
|
|
|
* @param physical if `false`, the message is marked as deleted, otherwise it is physically deleted.
|
|
|
|
|
*/
|
|
|
|
|
def deleteMessage(sequenceNr: Long, physical: Boolean): Unit = {
|
|
|
|
|
journal ! Delete(processorId, sequenceNr, physical)
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
2013-09-26 09:14:43 +02:00
|
|
|
/**
|
|
|
|
|
* Saves a `snapshot` of this processor's state. If saving succeeds, this processor will receive a
|
2013-10-08 11:46:02 +02:00
|
|
|
* [[SaveSnapshotSuccess]] message, otherwise a [[SaveSnapshotFailure]] message.
|
2013-09-26 09:14:43 +02:00
|
|
|
*/
|
|
|
|
|
def saveSnapshot(snapshot: Any): Unit = {
|
|
|
|
|
snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot)
|
|
|
|
|
}
|
|
|
|
|
|
2013-09-14 14:19:18 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API.
|
|
|
|
|
*/
|
2013-10-15 09:01:07 +02:00
|
|
|
protected[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = try {
|
|
|
|
|
_currentPersistent = persistent
|
|
|
|
|
updateLastSequenceNr(persistent)
|
|
|
|
|
body(persistent)
|
|
|
|
|
} finally _currentPersistent = null
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API.
|
|
|
|
|
*/
|
|
|
|
|
protected[persistence] def updateLastSequenceNr(persistent: Persistent) {
|
|
|
|
|
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API.
|
|
|
|
|
*/
|
|
|
|
|
override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = {
|
2013-09-14 14:19:18 +02:00
|
|
|
_currentState.aroundReceive(receive, message)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-09-15 09:04:05 +02:00
|
|
|
* INTERNAL API.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2013-09-15 09:04:05 +02:00
|
|
|
final override protected[akka] def aroundPreStart(): Unit = {
|
|
|
|
|
try preStart() finally super.preStart()
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-09-15 09:04:05 +02:00
|
|
|
* INTERNAL API.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2013-09-15 09:04:05 +02:00
|
|
|
final override protected[akka] def aroundPostStop(): Unit = {
|
|
|
|
|
try unstashAll(unstashFilterPredicate) finally postStop()
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-09-15 09:04:05 +02:00
|
|
|
* INTERNAL API.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2013-09-15 09:04:05 +02:00
|
|
|
final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
unstashAll(unstashFilterPredicate)
|
2013-10-15 09:01:07 +02:00
|
|
|
processorStash.unstashAll()
|
2013-09-15 09:04:05 +02:00
|
|
|
} finally {
|
|
|
|
|
message match {
|
2013-10-08 11:46:02 +02:00
|
|
|
case Some(WriteSuccess(m)) ⇒ preRestartDefault(reason, Some(m))
|
|
|
|
|
case Some(LoopSuccess(m)) ⇒ preRestartDefault(reason, Some(m))
|
|
|
|
|
case Some(Replayed(m)) ⇒ preRestartDefault(reason, Some(m))
|
|
|
|
|
case mo ⇒ preRestartDefault(reason, None)
|
2013-09-15 09:04:05 +02:00
|
|
|
}
|
|
|
|
|
}
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-09-15 09:04:05 +02:00
|
|
|
* User-overridable callback. Called when a processor is started. Default implementation sends
|
|
|
|
|
* a `Recover()` to `self`.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2013-09-15 09:04:05 +02:00
|
|
|
@throws(classOf[Exception])
|
|
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
self ! Recover()
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-09-15 09:04:05 +02:00
|
|
|
* User-overridable callback. Called before a processor is restarted. Default implementation sends
|
|
|
|
|
* a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2013-09-15 09:04:05 +02:00
|
|
|
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
|
2013-09-14 14:19:18 +02:00
|
|
|
message match {
|
2013-09-26 09:14:43 +02:00
|
|
|
case Some(_) ⇒ self ! Recover(toSequenceNr = lastSequenceNr)
|
2013-09-15 09:04:05 +02:00
|
|
|
case None ⇒ self ! Recover()
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-09-15 09:04:05 +02:00
|
|
|
* Calls [[preRestart]] and then `super.preRestart()`. If processor implementation classes want to
|
|
|
|
|
* opt out from stopping child actors, they should override this method and call [[preRestart]] only.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
2013-09-15 09:04:05 +02:00
|
|
|
def preRestartDefault(reason: Throwable, message: Option[Any]): Unit = {
|
|
|
|
|
try preRestart(reason, message) finally super.preRestart(reason, message)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def nextSequenceNr(): Long = {
|
|
|
|
|
_sequenceNr += 1L
|
|
|
|
|
_sequenceNr
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// -----------------------------------------------------
|
|
|
|
|
// Processor-internal stash
|
|
|
|
|
// -----------------------------------------------------
|
|
|
|
|
|
2013-10-15 09:01:07 +02:00
|
|
|
private val unstashFilterPredicate: Any ⇒ Boolean = {
|
2013-10-08 11:46:02 +02:00
|
|
|
case _: WriteSuccess ⇒ false
|
|
|
|
|
case _: Replayed ⇒ false
|
|
|
|
|
case _ ⇒ true
|
2013-09-14 14:19:18 +02:00
|
|
|
}
|
|
|
|
|
|
2013-11-07 10:45:02 +01:00
|
|
|
private val processorStash = createStash()
|
2013-09-14 14:19:18 +02:00
|
|
|
|
2013-10-15 09:01:07 +02:00
|
|
|
private def currentEnvelope: Envelope =
|
|
|
|
|
context.asInstanceOf[ActorCell].currentMessage
|
|
|
|
|
}
|
2013-09-14 14:19:18 +02:00
|
|
|
|
2013-10-15 09:01:07 +02:00
|
|
|
/**
|
2013-11-07 10:45:02 +01:00
|
|
|
* Sent to a [[Processor]] when a journal failed to write a [[Persistent]] message. If
|
|
|
|
|
* not handled, an `akka.actor.ActorKilledException` is thrown by that processor.
|
2013-10-15 09:01:07 +02:00
|
|
|
*
|
2013-11-07 10:45:02 +01:00
|
|
|
* @param payload payload of the persistent message.
|
|
|
|
|
* @param sequenceNr sequence number of the persistent message.
|
|
|
|
|
* @param cause failure cause.
|
2013-10-15 09:01:07 +02:00
|
|
|
*/
|
2013-11-07 10:45:02 +01:00
|
|
|
case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
|
2013-09-14 14:19:18 +02:00
|
|
|
|
|
|
|
|
/**
|
2013-11-07 10:45:02 +01:00
|
|
|
* Java API: an actor that persists (journals) messages of type [[Persistent]]. Messages of other types
|
|
|
|
|
* are not persisted.
|
2013-09-14 14:19:18 +02:00
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* import akka.persistence.Persistent;
|
|
|
|
|
* import akka.persistence.Processor;
|
|
|
|
|
*
|
|
|
|
|
* class MyProcessor extends UntypedProcessor {
|
|
|
|
|
* public void onReceive(Object message) throws Exception {
|
|
|
|
|
* if (message instanceof Persistent) {
|
|
|
|
|
* // message has been written to journal
|
|
|
|
|
* Persistent persistent = (Persistent)message;
|
|
|
|
|
* Object payload = persistent.payload();
|
|
|
|
|
* Long sequenceNr = persistent.sequenceNr();
|
|
|
|
|
* // ...
|
|
|
|
|
* } else {
|
|
|
|
|
* // message has not been written to journal
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
*
|
|
|
|
|
* // ...
|
|
|
|
|
*
|
|
|
|
|
* ActorRef processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor");
|
|
|
|
|
*
|
|
|
|
|
* processor.tell(Persistent.create("foo"), null);
|
|
|
|
|
* processor.tell("bar", null);
|
|
|
|
|
* }}}
|
|
|
|
|
*
|
|
|
|
|
* During start and restart, persistent messages are replayed to a processor so that it can recover internal
|
|
|
|
|
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed
|
|
|
|
|
* messages, hence applications don't need to wait for a processor to complete its recovery.
|
|
|
|
|
*
|
2013-09-26 09:14:43 +02:00
|
|
|
* Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life
|
|
|
|
|
* cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by
|
|
|
|
|
* sending it a [[Recover]] message.
|
2013-09-14 14:19:18 +02:00
|
|
|
*
|
|
|
|
|
* [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
|
2013-09-26 09:14:43 +02:00
|
|
|
* starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message.
|
2013-09-14 14:19:18 +02:00
|
|
|
*
|
|
|
|
|
* During recovery, a processor internally buffers new messages until recovery completes, so that new messages
|
|
|
|
|
* do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the
|
|
|
|
|
* ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the
|
|
|
|
|
* ''user stash'' for stashing/unstashing both persistent and transient messages.
|
|
|
|
|
*
|
2013-10-08 11:46:02 +02:00
|
|
|
* Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved
|
|
|
|
|
* snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any,
|
|
|
|
|
* that are younger than the snapshot. Default is to offer the latest saved snapshot.
|
|
|
|
|
*
|
2013-09-14 14:19:18 +02:00
|
|
|
* @see [[Processor]]
|
2013-10-08 11:46:02 +02:00
|
|
|
* @see [[Recover]]
|
2013-10-27 08:01:14 +01:00
|
|
|
* @see [[PersistentBatch]]
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
|
|
|
|
abstract class UntypedProcessor extends UntypedActor with Processor {
|
|
|
|
|
/**
|
2013-11-07 10:45:02 +01:00
|
|
|
* Java API. returns the current persistent message or `null` if there is none.
|
2013-09-14 14:19:18 +02:00
|
|
|
*/
|
|
|
|
|
def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null)
|
|
|
|
|
}
|