!per #3704 Persistence improvements

- Channel enhancements (#3773):
- Live read models (#3776):
- Batch-oriented journal plugin API (#3804):
- Batching of confirmations and deletions
- Message deletion enhancements (more efficient range deletions)
This commit is contained in:
Martin Krasser 2014-01-17 06:58:25 +01:00
parent 32b76adb9a
commit f327e1e357
55 changed files with 3474 additions and 2191 deletions

View file

@ -4,8 +4,7 @@
package akka.persistence
import scala.annotation.tailrec
import akka.AkkaException
import akka.actor._
import akka.dispatch._
@ -28,7 +27,6 @@ import akka.dispatch._
* processor ! "bar"
* }}}
*
*
* During start and restart, persistent messages are replayed to a processor so that it can recover internal
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed
* messages, hence applications don't need to wait for a processor to complete its recovery.
@ -53,97 +51,41 @@ import akka.dispatch._
* @see [[Recover]]
* @see [[PersistentBatch]]
*/
trait Processor extends Actor with Stash with StashFactory {
trait Processor extends Actor with Recovery {
import JournalProtocol._
import SnapshotProtocol._
private val extension = Persistence(context.system)
private val _processorId = extension.processorId(self)
import extension.maxBatchSize
/**
* Processor state.
* Processes the highest stored sequence number response from the journal and then switches
* to `processing` state.
*/
private trait State {
/**
* State-specific message handler.
*/
def aroundReceive(receive: Actor.Receive, message: Any): Unit
private val initializing = new State {
override def toString: String = "initializing"
protected def process(receive: Actor.Receive, message: Any) =
receive.applyOrElse(message, unhandled)
protected def processPersistent(receive: Actor.Receive, persistent: Persistent) =
withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled))
}
/**
* Initial state, waits for `Recover` request, then changes to `recoveryStarted`.
*/
private val recoveryPending = new State {
override def toString: String = "recovery pending"
def aroundReceive(receive: Actor.Receive, message: Any): Unit = message match {
case Recover(fromSnap, toSnr)
_currentState = recoveryStarted
snapshotStore ! LoadSnapshot(processorId, fromSnap, toSnr)
case _ processorStash.stash()
}
}
/**
* Processes a loaded snapshot and replayed messages, if any. If processing of the loaded
* snapshot fails, the exception is thrown immediately. If processing of a replayed message
* fails, the exception is caught and stored for being thrown later and state is changed to
* `recoveryFailed`.
*/
private val recoveryStarted = new State {
override def toString: String = "recovery started"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case LoadSnapshotResult(sso, toSnr) sso match {
case Some(SelectedSnapshot(metadata, snapshot))
process(receive, SnapshotOffer(metadata, snapshot))
journal ! Replay(metadata.sequenceNr + 1L, toSnr, processorId, self)
case None
journal ! Replay(1L, toSnr, processorId, self)
}
case ReplaySuccess(maxSnr)
_currentState = recoverySucceeded
_sequenceNr = maxSnr
processorStash.unstashAll()
case ReplayFailure(cause)
val notification = RecoveryFailure(cause)
if (receive.isDefinedAt(notification)) process(receive, notification)
else {
val errorMsg = s"Replay failure by journal (processor id = [${processorId}])"
throw new RecoveryFailureException(errorMsg, cause)
}
case Replayed(p) try { processPersistent(receive, p) } catch {
case t: Throwable
_currentState = recoveryFailed // delay throwing exception to prepareRestart
_recoveryFailureCause = t
_recoveryFailureMessage = currentEnvelope
}
case r: Recover // ignore
case _ processorStash.stash()
def aroundReceive(receive: Receive, message: Any) = message match {
case ReadHighestSequenceNrSuccess(highest)
_currentState = processing
sequenceNr = highest
receiverStash.unstashAll()
case ReadHighestSequenceNrFailure(cause)
onRecoveryFailure(receive, cause)
case other
receiverStash.stash()
}
}
/**
* Journals and processes new messages, both persistent and transient.
*/
private val recoverySucceeded = new State {
override def toString: String = "recovery finished"
private val processing = new State {
override def toString: String = "processing"
private var batching = false
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case r: Recover // ignore
case Replayed(p) processPersistent(receive, p) // can occur after unstash from user stash
case WriteSuccess(p) processPersistent(receive, p)
case WriteFailure(p, cause)
def aroundReceive(receive: Receive, message: Any) = message match {
case r: Recover // ignore
case ReplayedMessage(p) processPersistent(receive, p) // can occur after unstash from user stash
case WriteMessageSuccess(p) processPersistent(receive, p)
case WriteMessageFailure(p, cause)
val notification = PersistenceFailure(p.payload, p.sequenceNr, cause)
if (receive.isDefinedAt(notification)) process(receive, notification)
else {
@ -152,8 +94,8 @@ trait Processor extends Actor with Stash with StashFactory {
"To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages."
throw new ActorKilledException(errorMsg)
}
case LoopSuccess(m) process(receive, m)
case WriteBatchSuccess | WriteBatchFailure(_)
case LoopMessageSuccess(m) process(receive, m)
case WriteMessagesSuccess | WriteMessagesFailure(_)
if (processorBatch.isEmpty) batching = false else journalBatch()
case p: PersistentRepr
addToBatch(p)
@ -166,7 +108,7 @@ trait Processor extends Actor with Stash with StashFactory {
case m
// submit all batched messages before looping this message
if (processorBatch.isEmpty) batching = false else journalBatch()
journal forward Loop(m, self)
journal forward LoopMessage(m, self)
}
def addToBatch(p: PersistentRepr): Unit =
@ -176,67 +118,49 @@ trait Processor extends Actor with Stash with StashFactory {
pb.persistentReprList.foreach(addToBatch)
def maxBatchSizeReached: Boolean =
processorBatch.length >= maxBatchSize
processorBatch.length >= extension.settings.journal.maxMessageBatchSize
def journalBatch(): Unit = {
journal ! WriteBatch(processorBatch, self)
journal ! WriteMessages(processorBatch, self)
processorBatch = Vector.empty
batching = true
}
}
/**
* Consumes remaining replayed messages and then changes to `prepareRestart`. The
* message that caused the exception during replay, is re-added to the mailbox and
* re-received in `prepareRestart`.
* INTERNAL API.
*
* Switches to `initializing` state and requests the highest stored sequence number from the journal.
*/
private val recoveryFailed = new State {
override def toString: String = "recovery failed"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case ReplayFailure(_)
replayCompleted()
// journal couldn't tell the maximum stored sequence number, hence the next
// replay must be a full replay (up to the highest stored sequence number)
_lastSequenceNr = Long.MaxValue
case ReplaySuccess(_) replayCompleted()
case Replayed(p) updateLastSequenceNr(p)
case r: Recover // ignore
case _ processorStash.stash()
}
def replayCompleted(): Unit = {
_currentState = prepareRestart
mailbox.enqueueFirst(self, _recoveryFailureMessage)
}
private[persistence] def onReplaySuccess(receive: Receive, awaitReplay: Boolean): Unit = {
_currentState = initializing
journal ! ReadHighestSequenceNr(lastSequenceNr, processorId, self)
}
/**
* Re-receives the replayed message that causes an exception during replay and throws
* that exception.
* INTERNAL API.
*/
private val prepareRestart = new State {
override def toString: String = "prepare restart"
private[persistence] def onReplayFailure(receive: Receive, awaitReplay: Boolean, cause: Throwable): Unit =
onRecoveryFailure(receive, cause)
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case Replayed(_) throw _recoveryFailureCause
case _ // ignore
/**
* Invokes this processor's behavior with a `RecoveryFailure` message, if handled, otherwise throws a
* `RecoveryFailureException`.
*/
private def onRecoveryFailure(receive: Receive, cause: Throwable): Unit = {
val notification = RecoveryFailure(cause)
if (receive.isDefinedAt(notification)) {
receive(notification)
} else {
val errorMsg = s"Recovery failure by journal (processor id = [${processorId}])"
throw new RecoveryException(errorMsg, cause)
}
}
private val _processorId = extension.processorId(self)
private var processorBatch = Vector.empty[PersistentRepr]
private var _sequenceNr: Long = 0L
private var _lastSequenceNr: Long = 0L
private var _currentPersistent: Persistent = _
private var _currentState: State = recoveryPending
private var _recoveryFailureCause: Throwable = _
private var _recoveryFailureMessage: Envelope = _
private lazy val journal = extension.journalFor(processorId)
private lazy val snapshotStore = extension.snapshotStoreFor(processorId)
private var sequenceNr: Long = 0L
/**
* Processor id. Defaults to this processor's path and can be overridden.
@ -244,30 +168,21 @@ trait Processor extends Actor with Stash with StashFactory {
def processorId: String = _processorId
/**
* Highest received sequence number so far or `0L` if this processor hasn't received
* a persistent message yet. Usually equal to the sequence number of `currentPersistentMessage`
* (unless a processor implementation is about to re-order persistent messages using
* `stash()` and `unstash()`).
* Returns `processorId`.
*/
def lastSequenceNr: Long = _lastSequenceNr
def snapshotterId: String = processorId
/**
* Returns `true` if this processor is currently recovering.
*/
def recoveryRunning: Boolean =
_currentState == recoveryStarted ||
_currentState == prepareRestart
_currentState != processing
/**
* Returns `true` if this processor has successfully finished recovery.
*/
def recoveryFinished: Boolean =
_currentState == recoverySucceeded
/**
* Returns the current persistent message if there is one.
*/
implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent)
_currentState == processing
/**
* Marks a persistent message, identified by `sequenceNr`, as deleted. A message marked as deleted is
@ -289,23 +204,20 @@ trait Processor extends Actor with Stash with StashFactory {
* Processors that want to re-receive that persistent message during recovery should not call
* this method.
*
* Later extensions may also allow a replay of messages that have been marked as deleted which can
* be useful in debugging environments.
*
* @param sequenceNr sequence number of the persistent message to be deleted.
* @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted.
*/
def deleteMessage(sequenceNr: Long, permanent: Boolean): Unit = {
journal ! Delete(processorId, sequenceNr, sequenceNr, permanent)
journal ! DeleteMessages(List(PersistentIdImpl(processorId, sequenceNr)), permanent)
}
/**
* Marks all persistent messages with sequence numbers less than or equal `toSequenceNr` as deleted.
* Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
*/
def deleteMessages(toSequenceNr: Long): Unit = {
deleteMessages(toSequenceNr, false)
deleteMessages(toSequenceNr, true)
}
/**
@ -313,59 +225,11 @@ trait Processor extends Actor with Stash with StashFactory {
* is set to `false`, the persistent messages are marked as deleted in the journal, otherwise
* they permanently deleted from the journal.
*
* Later extensions may also allow a replay of messages that have been marked as deleted which can
* be useful in debugging environments.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
* @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted.
*/
def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = {
journal ! Delete(processorId, 1L, toSequenceNr, permanent)
}
/**
* Saves a `snapshot` of this processor's state. If saving succeeds, this processor will receive a
* [[SaveSnapshotSuccess]] message, otherwise a [[SaveSnapshotFailure]] message.
*/
def saveSnapshot(snapshot: Any): Unit = {
snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot)
}
/**
* Deletes a snapshot identified by `sequenceNr` and `timestamp`.
*/
def deleteSnapshot(sequenceNr: Long, timestamp: Long): Unit = {
snapshotStore ! DeleteSnapshot(SnapshotMetadata(processorId, sequenceNr, timestamp))
}
/**
* Deletes all snapshots matching `criteria`.
*/
def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
snapshotStore ! DeleteSnapshots(processorId, criteria)
}
/**
* INTERNAL API.
*/
protected[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent Unit): Unit = try {
_currentPersistent = persistent
updateLastSequenceNr(persistent)
body(persistent)
} finally _currentPersistent = null
/**
* INTERNAL API.
*/
protected[persistence] def updateLastSequenceNr(persistent: Persistent) {
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = {
_currentState.aroundReceive(receive, message)
journal ! DeleteMessagesTo(processorId, toSequenceNr, permanent)
}
/**
@ -387,15 +251,15 @@ trait Processor extends Actor with Stash with StashFactory {
*/
final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
try {
processorStash.prepend(processorBatch.map(p Envelope(p, p.sender, context.system)))
processorStash.unstashAll()
receiverStash.prepend(processorBatch.map(p Envelope(p, p.sender, context.system)))
receiverStash.unstashAll()
unstashAll(unstashFilterPredicate)
} finally {
message match {
case Some(WriteSuccess(m)) preRestartDefault(reason, Some(m))
case Some(LoopSuccess(m)) preRestartDefault(reason, Some(m))
case Some(Replayed(m)) preRestartDefault(reason, Some(m))
case mo preRestartDefault(reason, None)
case Some(WriteMessageSuccess(m)) preRestartDefault(reason, Some(m))
case Some(LoopMessageSuccess(m)) preRestartDefault(reason, Some(m))
case Some(ReplayedMessage(m)) preRestartDefault(reason, Some(m))
case mo preRestartDefault(reason, None)
}
}
}
@ -429,36 +293,44 @@ trait Processor extends Actor with Stash with StashFactory {
}
private def nextSequenceNr(): Long = {
_sequenceNr += 1L
_sequenceNr
sequenceNr += 1L
sequenceNr
}
// -----------------------------------------------------
// Processor-internal stash
// -----------------------------------------------------
private val unstashFilterPredicate: Any Boolean = {
case _: WriteSuccess false
case _: Replayed false
case _ true
case _: WriteMessageSuccess false
case _: ReplayedMessage false
case _ true
}
private val processorStash = createStash()
private def currentEnvelope: Envelope =
context.asInstanceOf[ActorCell].currentMessage
}
/**
* Sent to a [[Processor]] when a journal failed to write a [[Persistent]] message. If
* Sent to a [[Processor]] if a journal fails to write a [[Persistent]] message. If
* not handled, an `akka.actor.ActorKilledException` is thrown by that processor.
*
* @param payload payload of the persistent message.
* @param sequenceNr sequence number of the persistent message.
* @param cause failure cause.
*/
@SerialVersionUID(1L)
case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
/**
* Sent to a [[Processor]] if a journal fails to replay messages or fetch that processor's
* highest sequence number. If not handled, a [[RecoveryException]] is thrown by that
* processor.
*/
@SerialVersionUID(1L)
case class RecoveryFailure(cause: Throwable)
/**
* Thrown by a [[Processor]] if a journal fails to replay messages or fetch that processor's
* highest sequence number. This exception is only thrown if that processor doesn't handle
* [[RecoveryFailure]] messages.
*/
@SerialVersionUID(1L)
case class RecoveryException(message: String, cause: Throwable) extends AkkaException(message, cause)
/**
* Java API: an actor that persists (journals) messages of type [[Persistent]]. Messages of other types
* are not persisted.
@ -513,9 +385,4 @@ case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
* @see [[Recover]]
* @see [[PersistentBatch]]
*/
abstract class UntypedProcessor extends UntypedActor with Processor {
/**
* Java API. returns the current persistent message or `null` if there is none.
*/
def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null)
}
abstract class UntypedProcessor extends UntypedActor with Processor