per #15423 Remove deprecated features from akka-persistence

* remove channels
* remove View
* remove Processor
* collapse the complicated internal state management
  that was spread out between Processor, Eventsourced and Recovery
* remove Recovery trait, this caused some duplication between Eventsourced
  and PersistentView, but but the enhanced PersistentView will not be based
  on recovery infrastructure, and therefore PersistentView code will be replaced anyway
* remove PersistentBatch
* remove LoopMessage
* remove deleteMessages of individual messages
* remove Persistent, PersistentRepr and PersistentImpl are kept
* remove processorId
* update doc sample code
* note in migration guide about persistenceId
* rename Resequencable to PersistentEnvelope
This commit is contained in:
Patrik Nordwall 2014-12-08 11:02:14 +01:00
parent 86a5b3d9d7
commit c566d5a106
84 changed files with 2162 additions and 9560 deletions

View file

@ -5,9 +5,16 @@
package akka.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.persistence.JournalProtocol._
import scala.util.control.NonFatal
import akka.actor.AbstractActor
import akka.actor.Actor
import akka.actor.ActorCell
import akka.actor.ActorKilledException
import akka.actor.Cancellable
import akka.actor.Stash
import akka.actor.StashFactory
import akka.actor.UntypedActor
import akka.dispatch.Envelope
/**
* Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with
@ -17,8 +24,7 @@ import akka.persistence.JournalProtocol._
*
* @param await if `true`, processing of further messages sent to the view will be delayed until the
* incremental message replay, triggered by this update request, completes. If `false`,
* any message sent to the view may interleave with replayed [[Persistent]] message
* stream.
* any message sent to the view may interleave with replayed persistent event stream.
* @param replayMax maximum number of messages to replay when handling this update request. Defaults
* to `Long.MaxValue` (i.e. no limit).
*/
@ -68,81 +74,21 @@ object Update {
* - [[autoUpdate]] for turning automated updates on or off
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*/
trait PersistentView extends Actor with Recovery {
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory {
import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult
import context.dispatcher
/**
* INTERNAL API.
*
* Extends the `replayStarted` state of [[Recovery]] with logic to handle [[Update]] requests
* sent by users.
*/
private[persistence] override def replayStarted(await: Boolean) = new State {
private var delegateAwaiting = await
private var delegate = PersistentView.super.replayStarted(await)
override def toString: String = delegate.toString
override def aroundReceive(receive: Receive, message: Any) = message match {
case Update(false, _) // ignore
case u @ Update(true, _) if !delegateAwaiting
delegateAwaiting = true
delegate = PersistentView.super.replayStarted(await = true)
delegate.aroundReceive(receive, u)
case other
delegate.aroundReceive(receive, other)
}
}
/**
* When receiving an [[Update]] request, switches to `replayStarted` state and triggers
* an incremental message replay. Invokes the actor's current behavior for any other
* received message.
*/
private val idle: State = new State {
override def toString: String = "idle"
def aroundReceive(receive: Receive, message: Any): Unit = message match {
case r: Recover // ignore
case Update(awaitUpdate, replayMax)
_currentState = replayStarted(await = awaitUpdate)
journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
case other process(receive, other)
}
}
/**
* INTERNAL API.
*/
private[persistence] def onReplaySuccess(receive: Receive, await: Boolean): Unit =
onReplayComplete(await)
/**
* INTERNAL API.
*/
private[persistence] def onReplayFailure(receive: Receive, await: Boolean, cause: Throwable): Unit =
onReplayComplete(await)
/**
* Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`.
*/
private def onReplayComplete(await: Boolean): Unit = {
_currentState = idle
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax)))
if (await) receiverStash.unstashAll()
}
/**
* INTERNAL API
* WARNING: This implementation UNWRAPS PERSISTENT() before delivering to the receive block.
*/
override private[persistence] def runReceive(receive: Receive)(msg: Persistent): Unit =
receive.applyOrElse(msg.payload, unhandled)
private val extension = Persistence(context.system)
private val viewSettings = extension.settings.view
private lazy val journal = extension.journalFor(persistenceId)
private var schedule: Option[Cancellable] = None
private var _lastSequenceNr: Long = 0L
private val internalStash = createStash()
private var currentState: State = recoveryPending
/**
* View id is used as identifier for snapshots performed by this [[PersistentView]].
* This allows the View to keep separate snapshots of data than the [[PersistentActor]] originating the message stream.
@ -165,6 +111,11 @@ trait PersistentView extends Actor with Recovery {
*/
def viewId: String
/**
* Id of the persistent entity for which messages should be replayed.
*/
def persistenceId: String
/**
* Returns `viewId`.
*/
@ -174,8 +125,7 @@ trait PersistentView extends Actor with Recovery {
* If `true`, the currently processed message was persisted (is sent from the Journal).
* If `false`, the currently processed message comes from another actor (from "user-land").
*/
def isPersistent: Boolean =
currentPersistentMessage.isDefined
def isPersistent: Boolean = currentState.recoveryRunning
/**
* If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`.
@ -202,6 +152,23 @@ trait PersistentView extends Actor with Recovery {
def autoUpdateReplayMax: Long =
viewSettings.autoUpdateReplayMax
/**
* Highest received sequence number so far or `0L` if this actor hasn't replayed
* any persistent events yet.
*/
def lastSequenceNr: Long = _lastSequenceNr
/**
* Returns `lastSequenceNr`.
*/
def snapshotSequenceNr: Long = lastSequenceNr
private def setLastSequenceNr(value: Long): Unit =
_lastSequenceNr = value
private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
/**
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
* messages (following that snapshot).
@ -211,14 +178,205 @@ trait PersistentView extends Actor with Recovery {
self ! Recover(replayMax = autoUpdateReplayMax)
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = {
currentState.stateReceive(receive, message)
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try receiverStash.unstashAll() finally super.preRestart(reason, message)
try internalStash.unstashAll() finally super.preRestart(reason, message)
}
override def postStop(): Unit = {
schedule.foreach(_.cancel())
super.postStop()
}
override def unhandled(message: Any): Unit = {
message match {
case RecoveryCompleted // mute
case RecoveryFailure(cause)
val errorMsg = s"PersistentView killed after recovery failure (persisten id = [${persistenceId}]). " +
"To avoid killing persistent actors on recovery failure, a PersistentView must handle RecoveryFailure messages. " +
"RecoveryFailure was caused by: " + cause
throw new ActorKilledException(errorMsg)
case m super.unhandled(m)
}
}
private def changeState(state: State): Unit = {
currentState = state
}
// TODO There are some duplication of the recovery state management here and in Eventsourced.scala,
// but the enhanced PersistentView will not be based on recovery infrastructure, and
// therefore this code will be replaced anyway
private trait State {
def stateReceive(receive: Receive, message: Any): Unit
def recoveryRunning: Boolean
}
/**
* Initial state, waits for `Recover` request, and then submits a `LoadSnapshot` request to the snapshot
* store and changes to `recoveryStarted` state. All incoming messages except `Recover` are stashed.
*/
private def recoveryPending = new State {
override def toString: String = "recovery pending"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any): Unit = message match {
case Recover(fromSnap, toSnr, replayMax)
changeState(recoveryStarted(replayMax))
loadSnapshot(snapshotterId, fromSnap, toSnr)
case _ internalStash.stash()
}
}
/**
* Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer`
* message to the actor's `receiveRecover`. Then initiates a message replay, either starting
* from the loaded snapshot or from scratch, and switches to `replayStarted` state.
* All incoming messages are stashed.
*
* @param replayMax maximum number of messages to replay.
*/
private def recoveryStarted(replayMax: Long) = new State {
override def toString: String = s"recovery started (replayMax = [${replayMax}])"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case r: Recover // ignore
case LoadSnapshotResult(sso, toSnr)
sso.foreach {
case SelectedSnapshot(metadata, snapshot)
setLastSequenceNr(metadata.sequenceNr)
// Since we are recovering we can ignore the receive behavior from the stack
PersistentView.super.aroundReceive(receive, SnapshotOffer(metadata, snapshot))
}
changeState(replayStarted(await = true))
journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self)
case other internalStash.stash()
}
}
/**
* Processes replayed messages, if any. The actor's `receiveRecover` is invoked with the replayed
* events.
*
* If replay succeeds it switches to `initializing` state and requests the highest stored sequence
* number from the journal. Otherwise RecoveryFailure is emitted.
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`.
*
* If processing of a replayed event fails, the exception is caught and
* stored for being thrown later and state is changed to `recoveryFailed`.
*
* All incoming messages are stashed.
*/
private def replayStarted(await: Boolean) = new State {
override def toString: String = s"replay started"
override def recoveryRunning: Boolean = true
private var stashUpdate = await
override def stateReceive(receive: Receive, message: Any) = message match {
case Update(false, _) // ignore
case u @ Update(true, _) if !stashUpdate
stashUpdate = true
internalStash.stash()
case r: Recover // ignore
case ReplayedMessage(p)
try {
updateLastSequenceNr(p)
PersistentView.super.aroundReceive(receive, p.payload)
} catch {
case NonFatal(t)
val currentMsg = context.asInstanceOf[ActorCell].currentMessage
changeState(replayFailed(t, currentMsg)) // delay throwing exception to prepareRestart
}
case ReplayMessagesSuccess
onReplayComplete(await)
case ReplayMessagesFailure(cause)
onReplayComplete(await)
// FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
PersistentView.super.aroundReceive(receive, RecoveryFailure(cause))
case other
internalStash.stash()
}
/**
* Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`.
*/
private def onReplayComplete(await: Boolean): Unit = {
changeState(idle)
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax)))
if (await) internalStash.unstashAll()
}
}
/**
* Consumes remaining replayed messages and then changes to `prepareRestart`. The
* message that caused the exception during replay, is re-added to the mailbox and
* re-received in `prepareRestart` state.
*/
private def replayFailed(cause: Throwable, failureMessage: Envelope) = new State {
override def toString: String = "replay failed"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayMessagesFailure(_)
replayCompleted()
// journal couldn't tell the maximum stored sequence number, hence the next
// replay must be a full replay (up to the highest stored sequence number)
// Recover(lastSequenceNr) is sent by preRestart
setLastSequenceNr(Long.MaxValue)
case ReplayMessagesSuccess replayCompleted()
case ReplayedMessage(p) updateLastSequenceNr(p)
case r: Recover // ignore
case _ internalStash.stash()
}
def replayCompleted(): Unit = {
changeState(prepareRestart(cause))
mailbox.enqueueFirst(self, failureMessage)
}
}
/**
* Re-receives the replayed message that caused an exception and re-throws that exception.
*/
private def prepareRestart(cause: Throwable) = new State {
override def toString: String = "prepare restart"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(_) throw cause
case _ // ignore
}
}
/**
* When receiving an [[Update]] request, switches to `replayStarted` state and triggers
* an incremental message replay. Invokes the actor's current behavior for any other
* received message.
*/
private val idle: State = new State {
override def toString: String = "idle"
override def recoveryRunning: Boolean = false
override def stateReceive(receive: Receive, message: Any): Unit = message match {
case r: Recover // ignore
case Update(awaitUpdate, replayMax)
changeState(replayStarted(await = awaitUpdate))
journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
case other PersistentView.super.aroundReceive(receive, other)
}
}
}
/**