/** * Copyright (C) 2009-2015 Typesafe Inc. */ package akka.persistence import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.actor.AbstractActor import akka.actor.Actor import akka.actor.Cancellable import akka.actor.Stash import akka.actor.StashFactory import akka.actor.UntypedActor import akka.dispatch.Envelope import akka.actor.ActorLogging /** * Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with * all messages from the corresponding persistent id's journal that have not yet been consumed by the view. * To update a view with messages that have been written after handling this request, another `Update` * request must be sent to the view. * * @param await if `true`, processing of further messages sent to the view will be delayed until the * incremental message replay, triggered by this update request, completes. If `false`, * any message sent to the view may interleave with replayed persistent event stream. * @param replayMax maximum number of messages to replay when handling this update request. Defaults * to `Long.MaxValue` (i.e. no limit). */ @SerialVersionUID(1L) final case class Update(await: Boolean = false, replayMax: Long = Long.MaxValue) object Update { /** * Java API. */ def create() = Update() /** * Java API. */ def create(await: Boolean) = Update(await) /** * Java API. */ def create(await: Boolean, replayMax: Long) = Update(await, replayMax) } /** * INTERNAL API */ private[akka] object PersistentView { private final case class ScheduledUpdate(replayMax: Long) } /** * A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive * the message stream directly from the Journal. These messages can be processed to update internal state * in order to maintain an (eventual consistent) view of the state of the corresponding persistent actor. A * persistent view can also run on a different node, provided that a replicated journal is used. * * Implementation classes refer to a persistent actors' message stream by implementing `persistenceId` * with the corresponding (shared) identifier value. * * Views can also store snapshots of internal state by calling [[autoUpdate]]. The snapshots of a view * are independent of those of the referenced persistent actor. During recovery, a saved snapshot is offered * to the view with a [[SnapshotOffer]] message, followed by replayed messages, if any, that are younger * than the snapshot. Default is to offer the latest saved snapshot. * * By default, a view automatically updates itself with an interval returned by `autoUpdateInterval`. * This method can be overridden by implementation classes to define a view instance-specific update * interval. The default update interval for all views of an actor system can be configured with the * `akka.persistence.view.auto-update-interval` configuration key. Applications may trigger additional * view updates by sending the view [[Update]] requests. See also methods * * - [[autoUpdate]] for turning automated updates on or off * - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle */ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity with ActorLogging { import PersistentView._ import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult import context.dispatcher private val extension = Persistence(context.system) private val viewSettings = extension.settings.view private[persistence] lazy val journal = extension.journalFor(journalPluginId) private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId) private var schedule: Option[Cancellable] = None private var _lastSequenceNr: Long = 0L private val internalStash = createStash() private var currentState: State = recoveryPending /** * View id is used as identifier for snapshots performed by this [[PersistentView]]. * This allows the View to keep separate snapshots of data than the [[PersistentActor]] originating the message stream. * * * The usual case is to have a *different* id set as `viewId` than `persistenceId`, * although it is possible to share the same id with an [[PersistentActor]] - for example to decide about snapshots * based on some average or sum, calculated by this view. * * Example: * {{{ * class SummingView extends PersistentView { * override def persistenceId = "count-123" * override def viewId = "count-123-sum" // this view is performing summing, * // so this view's snapshots reside under the "-sum" suffixed id * * // ... * } * }}} */ def viewId: String /** * Returns `viewId`. */ def snapshotterId: String = viewId /** * If `true`, the currently processed message was persisted (is sent from the Journal). * If `false`, the currently processed message comes from another actor (from "user-land"). */ def isPersistent: Boolean = currentState.recoveryRunning /** * If `true`, this view automatically updates itself with an interval specified by `autoUpdateInterval`. * If `false`, applications must explicitly update this view by sending [[Update]] requests. The default * value can be configured with the `akka.persistence.view.auto-update` configuration key. This method * can be overridden by implementation classes to return non-default values. */ def autoUpdate: Boolean = viewSettings.autoUpdate /** * The interval for automated updates. The default value can be configured with the * `akka.persistence.view.auto-update-interval` configuration key. This method can be * overridden by implementation classes to return non-default values. */ def autoUpdateInterval: FiniteDuration = viewSettings.autoUpdateInterval /** * The maximum number of messages to replay per update. The default value can be configured with the * `akka.persistence.view.auto-update-replay-max` configuration key. This method can be overridden by * implementation classes to return non-default values. */ def autoUpdateReplayMax: Long = viewSettings.autoUpdateReplayMax /** * Highest received sequence number so far or `0L` if this actor hasn't replayed * any persistent events yet. */ def lastSequenceNr: Long = _lastSequenceNr /** * Returns `lastSequenceNr`. */ def snapshotSequenceNr: Long = lastSequenceNr private def setLastSequenceNr(value: Long): Unit = _lastSequenceNr = value private def updateLastSequenceNr(persistent: PersistentRepr): Unit = if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr /** * Triggers an initial recovery, starting form a snapshot, if any, and replaying at most `autoUpdateReplayMax` * messages (following that snapshot). */ override def preStart(): Unit = { super.preStart() self ! Recover(replayMax = autoUpdateReplayMax) if (autoUpdate) schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval, self, ScheduledUpdate(autoUpdateReplayMax))) } /** INTERNAL API. */ override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = { currentState.stateReceive(receive, message) } /** INTERNAL API. */ override protected[akka] def aroundPreStart(): Unit = { // Fail fast on missing plugins. val j = journal; val s = snapshotStore super.aroundPreStart() } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { try internalStash.unstashAll() finally super.preRestart(reason, message) } override def postStop(): Unit = { schedule.foreach(_.cancel()) super.postStop() } /** * Called whenever a message replay fails. By default it logs the error. * Subclass may override to customize logging. * The `PersistentView` will not stop or throw exception due to this. * It will try again on next update. */ protected def onReplayError(cause: Throwable): Unit = { log.error(cause, "Persistence view failure when replaying events for persistenceId [{}]. " + "Last known sequence number [{}]", persistenceId, lastSequenceNr) } private def changeState(state: State): Unit = { currentState = state } // TODO There are some duplication of the recovery state management here and in Eventsourced.scala, // but the enhanced PersistentView will not be based on recovery infrastructure, and // therefore this code will be replaced anyway private trait State { def stateReceive(receive: Receive, message: Any): Unit def recoveryRunning: Boolean } /** * Initial state, waits for `Recover` request, and then submits a `LoadSnapshot` request to the snapshot * store and changes to `recoveryStarted` state. All incoming messages except `Recover` are stashed. */ private def recoveryPending = new State { override def toString: String = "recovery pending" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any): Unit = message match { case Recover(fromSnap, toSnr, replayMax) ⇒ changeState(recoveryStarted(replayMax)) loadSnapshot(snapshotterId, fromSnap, toSnr) case _ ⇒ internalStash.stash() } } /** * Processes a loaded snapshot, if any. A loaded snapshot is offered with a `SnapshotOffer` * message to the actor's `receive`. Then initiates a message replay, either starting * from the loaded snapshot or from scratch, and switches to `replayStarted` state. * All incoming messages are stashed. * * @param replayMax maximum number of messages to replay. */ private def recoveryStarted(replayMax: Long) = new State { override def toString: String = s"recovery started (replayMax = [${replayMax}])" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { case r: Recover ⇒ // ignore case LoadSnapshotResult(sso, toSnr) ⇒ sso.foreach { case SelectedSnapshot(metadata, snapshot) ⇒ setLastSequenceNr(metadata.sequenceNr) PersistentView.super.aroundReceive(receive, SnapshotOffer(metadata, snapshot)) } changeState(replayStarted(await = true)) journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) case other ⇒ internalStash.stash() } } /** * Processes replayed messages, if any. The actor's `receive` is invoked with the replayed * events. * * If replay succeeds it switches to `initializing` state and requests the highest stored sequence * number from the journal. * * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayError` is called and * remaining replay events are consumed (ignored). * * If processing of a replayed event fails, the exception is caught and * stored for later and state is changed to `recoveryFailed`. * * All incoming messages are stashed when `await` is true. */ private def replayStarted(await: Boolean) = new State { override def toString: String = s"replay started" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { case ReplayedMessage(p) ⇒ try { updateLastSequenceNr(p) PersistentView.super.aroundReceive(receive, p.payload) } catch { case NonFatal(t) ⇒ changeState(ignoreRemainingReplay(t)) } case ReplayMessagesSuccess ⇒ onReplayComplete() case ReplayMessagesFailure(cause) ⇒ try onReplayError(cause) finally onReplayComplete() case ScheduledUpdate(_) ⇒ // ignore case r: Recover ⇒ // ignore case Update(a, _) ⇒ if (a) internalStash.stash() case other ⇒ if (await) internalStash.stash() else { try { PersistentView.super.aroundReceive(receive, other) } catch { case NonFatal(t) ⇒ changeState(ignoreRemainingReplay(t)) } } } /** * Switches to `idle` */ private def onReplayComplete(): Unit = { changeState(idle) internalStash.unstashAll() } } /** * Consumes remaining replayed messages and then throw the exception. */ private def ignoreRemainingReplay(cause: Throwable) = new State { override def toString: String = "replay failed" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { case ReplayedMessage(p) ⇒ case ReplayMessagesFailure(_) ⇒ replayCompleted(receive) // journal couldn't tell the maximum stored sequence number, hence the next // replay must be a full replay (up to the highest stored sequence number) // Recover(lastSequenceNr) is sent by preRestart setLastSequenceNr(Long.MaxValue) case ReplayMessagesSuccess ⇒ replayCompleted(receive) case r: Recover ⇒ // ignore case _ ⇒ internalStash.stash() } def replayCompleted(receive: Receive): Unit = { // in case the actor resumes the state must be `idle` changeState(idle) internalStash.unstashAll() throw cause } } /** * When receiving an [[Update]] request, switches to `replayStarted` state and triggers * an incremental message replay. Invokes the actor's current behavior for any other * received message. */ private val idle: State = new State { override def toString: String = "idle" override def recoveryRunning: Boolean = false override def stateReceive(receive: Receive, message: Any): Unit = message match { case r: Recover ⇒ // ignore case ScheduledUpdate(replayMax) ⇒ changeStateToReplayStarted(await = false, replayMax) case Update(awaitUpdate, replayMax) ⇒ changeStateToReplayStarted(awaitUpdate, replayMax) case other ⇒ PersistentView.super.aroundReceive(receive, other) } def changeStateToReplayStarted(await: Boolean, replayMax: Long): Unit = { changeState(replayStarted(await)) journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self) } } } /** * Java API. * * @see [[PersistentView]] */ abstract class UntypedPersistentView extends UntypedActor with PersistentView /** * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]) * * @see [[PersistentView]] */ abstract class AbstractPersistentView extends AbstractActor with PersistentView