+ LambdaDoc samples now in the docs project = simplified internal state by removing recoveryPending = recovery is now triggered in around* method, so user is free to use preStart freely - recovery works even if one forgets to call super on preStart
This commit is contained in:
parent
f38af5fd1a
commit
33fbfec222
23 changed files with 218 additions and 342 deletions
|
|
@ -80,7 +80,8 @@ private[akka] object PersistentView {
|
|||
* - [[autoUpdate]] for turning automated updates on or off
|
||||
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
|
||||
*/
|
||||
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity
|
||||
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
||||
with PersistenceIdentity with PersistenceRecovery
|
||||
with ActorLogging {
|
||||
import PersistentView._
|
||||
import JournalProtocol._
|
||||
|
|
@ -97,7 +98,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
|||
|
||||
private var _lastSequenceNr: Long = 0L
|
||||
private val internalStash = createStash()
|
||||
private var currentState: State = recoveryPending
|
||||
private var currentState: State = recoveryStarted(Long.MaxValue)
|
||||
|
||||
/**
|
||||
* View id is used as identifier for snapshots performed by this [[PersistentView]].
|
||||
|
|
@ -155,7 +156,10 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
|||
* implementation classes to return non-default values.
|
||||
*/
|
||||
def autoUpdateReplayMax: Long =
|
||||
viewSettings.autoUpdateReplayMax
|
||||
viewSettings.autoUpdateReplayMax match {
|
||||
case -1 ⇒ Long.MaxValue
|
||||
case value ⇒ value
|
||||
}
|
||||
|
||||
/**
|
||||
* Highest received sequence number so far or `0L` if this actor hasn't replayed
|
||||
|
|
@ -174,21 +178,26 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
|||
private def updateLastSequenceNr(persistent: PersistentRepr): Unit =
|
||||
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
|
||||
|
||||
override def recovery = Recovery(replayMax = autoUpdateReplayMax)
|
||||
|
||||
/**
|
||||
* Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax`
|
||||
* messages (following that snapshot).
|
||||
*/
|
||||
override def preStart(): Unit = {
|
||||
super.preStart()
|
||||
self ! Recover(replayMax = autoUpdateReplayMax)
|
||||
if (autoUpdate) schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval,
|
||||
self, ScheduledUpdate(autoUpdateReplayMax)))
|
||||
startRecovery(recovery)
|
||||
if (autoUpdate)
|
||||
schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, self, ScheduledUpdate(autoUpdateReplayMax)))
|
||||
}
|
||||
|
||||
private def startRecovery(recovery: Recovery): Unit = {
|
||||
changeState(recoveryStarted(recovery.replayMax))
|
||||
loadSnapshot(snapshotterId, recovery.fromSnapshot, recovery.toSequenceNr)
|
||||
}
|
||||
|
||||
/** INTERNAL API. */
|
||||
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = {
|
||||
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
|
||||
currentState.stateReceive(receive, message)
|
||||
}
|
||||
|
||||
/** INTERNAL API. */
|
||||
override protected[akka] def aroundPreStart(): Unit = {
|
||||
|
|
@ -230,22 +239,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
|||
def recoveryRunning: Boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* Initial state, waits for `Recover` request, and then submits a `LoadSnapshot` request to the snapshot
|
||||
* store and changes to `recoveryStarted` state. All incoming messages except `Recover` are stashed.
|
||||
*/
|
||||
private def recoveryPending = new State {
|
||||
override def toString: String = "recovery pending"
|
||||
override def recoveryRunning: Boolean = true
|
||||
|
||||
override def stateReceive(receive: Receive, message: Any): Unit = message match {
|
||||
case Recover(fromSnap, toSnr, replayMax) ⇒
|
||||
changeState(recoveryStarted(replayMax))
|
||||
loadSnapshot(snapshotterId, fromSnap, toSnr)
|
||||
case _ ⇒ internalStash.stash()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer`
|
||||
* message to the actor's `receive`. Then initiates a message replay, either starting
|
||||
|
|
@ -260,7 +253,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
|||
override def recoveryRunning: Boolean = true
|
||||
|
||||
override def stateReceive(receive: Receive, message: Any) = message match {
|
||||
case r: Recover ⇒ // ignore
|
||||
case LoadSnapshotResult(sso, toSnr) ⇒
|
||||
sso.foreach {
|
||||
case SelectedSnapshot(metadata, snapshot) ⇒
|
||||
|
|
@ -306,7 +298,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
|||
case ReplayMessagesFailure(cause) ⇒
|
||||
try onReplayError(cause) finally onReplayComplete()
|
||||
case ScheduledUpdate(_) ⇒ // ignore
|
||||
case r: Recover ⇒ // ignore
|
||||
case Update(a, _) ⇒
|
||||
if (a)
|
||||
internalStash.stash()
|
||||
|
|
@ -349,7 +340,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
|||
// Recover(lastSequenceNr) is sent by preRestart
|
||||
setLastSequenceNr(Long.MaxValue)
|
||||
case ReplayMessagesSuccess ⇒ replayCompleted(receive)
|
||||
case r: Recover ⇒ // ignore
|
||||
case _ ⇒ internalStash.stash()
|
||||
}
|
||||
|
||||
|
|
@ -371,7 +361,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
|
|||
override def recoveryRunning: Boolean = false
|
||||
|
||||
override def stateReceive(receive: Receive, message: Any): Unit = message match {
|
||||
case r: Recover ⇒ // ignore
|
||||
case ScheduledUpdate(replayMax) ⇒ changeStateToReplayStarted(await = false, replayMax)
|
||||
case Update(awaitUpdate, replayMax) ⇒ changeStateToReplayStarted(awaitUpdate, replayMax)
|
||||
case other ⇒ PersistentView.super.aroundReceive(receive, other)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue