=per #15942 Support resume after recovery failure

* also improved fault handling in various places (bugs found)

* and manually triggered Update must be distinguished from scheduled
  auto updates, otherwise manual Update will schedule extra auto updates
This commit is contained in:
Patrik Nordwall 2014-12-14 21:45:22 +01:00
parent 72d54626f3
commit 9b5a446a4a
8 changed files with 239 additions and 95 deletions

View file

@ -120,6 +120,10 @@ When persisting events with ``persist`` it is guaranteed that the persistent act
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command.
If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`.
This can be customized by handling ``PersistenceFailure`` message in ``receiveCommand`` and/or defining
``supervisorStrategy`` in parent actor.
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples in Java with Lambdas <http://www.typesafe.com/activator/template/akka-sample-persistence-java-lambda>`_.
It contains instructions on how to run the ``PersistentActorExample``.
@ -192,11 +196,12 @@ recovery has completed, before processing any other message sent to the persiste
The persistent actor will receive a special :class:`RecoveryCompleted` message right after recovery
and before any other received messages.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed
If there is a problem with recovering the state of the actor from the journal, the actor will be
sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped.
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed
Relaxed local consistency requirements and high throughput use-cases
--------------------------------------------------------------------

View file

@ -122,6 +122,10 @@ When persisting events with ``persist`` it is guaranteed that the persistent act
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command.
If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`.
This can be customized by handling ``PersistenceFailure`` message in ``onReceiveCommand`` and/or defining
``supervisorStrategy`` in parent actor.
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Java <http://www.typesafe.com/activator/template/akka-sample-persistence-java>`_.
It contains instructions on how to run the ``PersistentActorExample``.
@ -195,11 +199,11 @@ recovery has completed, before processing any other message sent to the persiste
The persistent actor will receive a special :class:`RecoveryCompleted` message right after recovery
and before any other received messages.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed
If there is a problem with recovering the state of the actor from the journal, the actor will be
sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`.
.. _persist-async-java:

View file

@ -114,6 +114,10 @@ When persisting events with ``persist`` it is guaranteed that the persistent act
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command.
If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`.
This can be customized by handling ``PersistenceFailure`` message in ``receiveCommand`` and/or defining
``supervisorStrategy`` in parent actor.
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Scala <http://www.typesafe.com/activator/template/akka-sample-persistence-scala>`_.
It contains instructions on how to run the ``PersistentActorExample``.
@ -186,11 +190,11 @@ recovery has completed, before processing any other message sent to the persiste
The persistent actor will receive a special :class:`RecoveryCompleted` message right after recovery
and before any other received messages.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed
If there is a problem with recovering the state of the actor from the journal, the actor will be
sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`.
.. _persist-async-scala:

View file

@ -8,10 +8,10 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.actor.ActorKilledException
import akka.actor.ActorLogging
import akka.actor.Stash
import akka.actor.StashFactory
import akka.dispatch.Envelope
import akka.event.Logging
import akka.event.LoggingAdapter
/**
* INTERNAL API
@ -105,8 +105,10 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
private[persistence] def onReplayFailure(cause: Throwable): Unit = ()
/**
* User-overridable callback. Called when a persistent actor is started. Default implementation sends
* a `Recover()` to `self`.
* User-overridable callback. Called when a persistent actor is started or restarted.
* Default implementation sends a `Recover()` to `self`. Note that if you override
* `preStart` (or `preRestart`) and not call `super.preStart` you must send
* a `Recover()` message to `self` to activate the persistent actor.
*/
@throws(classOf[Exception])
override def preStart(): Unit =
@ -143,18 +145,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
}
}
/**
* User-overridable callback. Called before a persistent actor is restarted. Default implementation sends
* a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`.
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
message match {
case Some(_) self ! Recover(toSequenceNr = lastSequenceNr)
case None self ! Recover()
}
}
/**
* INTERNAL API.
*/
@ -166,7 +156,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def unhandled(message: Any): Unit = {
message match {
case RecoveryCompleted // mute
case RecoveryCompleted | ReadHighestSequenceNrSuccess | ReadHighestSequenceNrFailure // mute
case RecoveryFailure(cause)
val errorMsg = s"PersistentActor killed after recovery failure (persisten id = [${persistenceId}]). " +
"To avoid killing persistent actors on recovery failure, a PersistentActor must handle RecoveryFailure messages. " +
@ -203,6 +193,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
writeInProgress = true
}
private def log: LoggingAdapter = Logging(context.system, this)
/**
* Recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
@ -212,8 +204,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If recovery fails, the actor will be stopped. This can be customized by
* handling [[RecoveryFailure]].
* If recovery fails, the actor will be stopped by throwing ActorKilledException.
* This can be customized by handling [[RecoveryFailure]] message in `receiveRecover`
* and/or defining `supervisorStrategy` in parent actor.
*
* @see [[Recover]]
*/
@ -241,8 +234,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
* If persistence of an event fails, the persistent actor will be stopped by throwing ActorKilledException.
* This can be customized by handling [[PersistenceFailure]] message in [[#receiveCommand]]
* and/or defining `supervisorStrategy` in parent actor.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
@ -268,7 +262,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* Unlike `persist` the persistent actor will continue to receive incoming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method.
@ -277,8 +271,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
* If persistence of an event fails, the persistent actor will be stopped by throwing ActorKilledException.
* This can be customized by handling [[PersistenceFailure]] message in [[#receiveCommand]]
* and/or defining `supervisorStrategy` in parent actor.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
@ -308,14 +303,14 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor by
* throwing ActorKilledException, thus the handlers will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: A Unit): Unit = {
@ -336,14 +331,14 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor by
* throwing ActorKilledException, thus the handlers will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param events event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: immutable.Seq[A])(handler: A Unit): Unit =
@ -483,8 +478,11 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
changeState(initializing(recoveryBehavior))
journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self)
case ReplayMessagesFailure(cause)
// in case the actor resumes the state must be initializing
changeState(initializing(recoveryBehavior))
journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self)
onReplayFailure(cause) // callback for subclass implementation
// FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None))
case other
internalStash.stash()
@ -502,19 +500,16 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p) updateLastSequenceNr(p)
case ReplayMessagesFailure(_)
replayCompleted()
// journal couldn't tell the maximum stored sequence number, hence the next
// replay must be a full replay (up to the highest stored sequence number)
// Recover(lastSequenceNr) is sent by preRestart
setLastSequenceNr(Long.MaxValue)
case ReplayMessagesSuccess replayCompleted()
case ReplayMessagesSuccess | ReplayMessagesFailure(_) replayCompleted()
case r: Recover // ignore
case _ internalStash.stash()
}
def replayCompleted(): Unit = {
// FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
// in case the actor resumes the state must be initializing
changeState(initializing(recoveryBehavior))
journal ! ReadHighestSequenceNr(failed.sequenceNr, persistenceId, self)
Eventsourced.super.aroundReceive(recoveryBehavior,
RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload))))
}
@ -533,10 +528,13 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
case ReadHighestSequenceNrSuccess(highest)
changeState(processingCommands)
sequenceNr = highest
setLastSequenceNr(highest)
internalStash.unstashAll()
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
case ReadHighestSequenceNrFailure(cause)
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None))
log.error(cause, "PersistentActor could not retrieve highest sequence number and must " +
"therefore be stopped. (persisten id = [{}]).", persistenceId)
context.stop(self)
case other
internalStash.stash()
}
@ -552,26 +550,34 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
// while message is in flight, in that case we ignore the call to the handler
if (id == instanceId) {
updateLastSequenceNr(p)
try pendingInvocations.peek().handler(p.payload) finally onWriteMessageComplete()
try {
pendingInvocations.peek().handler(p.payload)
onWriteMessageComplete(err = false)
} catch { case NonFatal(e) onWriteMessageComplete(err = true); throw e }
}
case WriteMessageFailure(p, cause, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == instanceId) {
try {
Eventsourced.super.aroundReceive(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) // stops actor by default
onWriteMessageComplete()
onWriteMessageComplete(err = false)
} catch { case NonFatal(e) onWriteMessageComplete(err = true); throw e }
}
case LoopMessageSuccess(l, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case we ignore the call to the handler
if (id == instanceId) {
try pendingInvocations.peek().handler(l) finally onWriteMessageComplete()
try {
pendingInvocations.peek().handler(l)
onWriteMessageComplete(err = false)
} catch { case NonFatal(e) onWriteMessageComplete(err = true); throw e }
}
case WriteMessagesSuccessful | WriteMessagesFailed(_)
case WriteMessagesSuccessful | WriteMessagesFailed(_) // FIXME PN: WriteMessagesFailed?
if (journalBatch.isEmpty) writeInProgress = false else flushJournalBatch()
}
def onWriteMessageComplete(): Unit =
def onWriteMessageComplete(err: Boolean): Unit =
pendingInvocations.pop()
}
@ -585,15 +591,20 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def stateReceive(receive: Receive, message: Any) =
if (common.isDefinedAt(message)) common(message)
else doAroundReceive(receive, message)
private def doAroundReceive(receive: Receive, message: Any): Unit = {
else try {
Eventsourced.super.aroundReceive(receive, message)
aroundReceiveComplete(err = false)
} catch { case NonFatal(e) aroundReceiveComplete(err = true); throw e }
private def aroundReceiveComplete(err: Boolean): Unit = {
if (eventBatch.nonEmpty) flushBatch()
if (pendingStashingPersistInvocations > 0) changeState(persistingEvents)
else internalStash.unstash()
if (pendingStashingPersistInvocations > 0)
changeState(persistingEvents)
else if (err)
internalStash.unstashAll()
else
internalStash.unstash()
}
private def flushBatch() {
@ -637,7 +648,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
if (common.isDefinedAt(message)) common(message)
else internalStash.stash()
override def onWriteMessageComplete(): Unit = {
override def onWriteMessageComplete(err: Boolean): Unit = {
pendingInvocations.pop() match {
case _: StashingHandlerInvocation
// enables an early return to `processingCommands`, because if this counter hits `0`,
@ -649,7 +660,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
if (pendingStashingPersistInvocations == 0) {
changeState(processingCommands)
internalStash.unstash()
if (err) internalStash.unstashAll()
else internalStash.unstash()
}
}

View file

@ -174,7 +174,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced {
* JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* Unlike `persist` the persistent actor will continue to receive incoming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method.
@ -212,14 +212,14 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced {
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: Procedure[A]): Unit =
@ -234,14 +234,14 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced {
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param events event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit =
@ -317,7 +317,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* Unlike `persist` the persistent actor will continue to receive incoming commands between the
* call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees.
*
@ -339,14 +339,14 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: Procedure[A]): Unit =
@ -361,14 +361,14 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param events event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit =

View file

@ -50,6 +50,13 @@ object Update {
Update(await, replayMax)
}
/**
* INTERNAL API
*/
private[akka] object PersistentView {
private final case class ScheduledUpdate(replayMax: Long)
}
/**
* A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive
* the message stream directly from the Journal. These messages can be processed to update internal state
@ -74,6 +81,7 @@ object Update {
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*/
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory {
import PersistentView._
import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult
import context.dispatcher
@ -175,6 +183,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
override def preStart(): Unit = {
super.preStart()
self ! Recover(replayMax = autoUpdateReplayMax)
if (autoUpdate) schedule = Some(context.system.scheduler.schedule(autoUpdateInterval, autoUpdateInterval,
self, ScheduledUpdate(autoUpdateReplayMax)))
}
/**
@ -282,6 +292,7 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
private var stashUpdate = await
override def stateReceive(receive: Receive, message: Any) = message match {
case ScheduledUpdate(_) // ignore
case Update(false, _) // ignore
case u @ Update(true, _) if !stashUpdate
stashUpdate = true
@ -299,7 +310,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
onReplayComplete(await)
case ReplayMessagesFailure(cause)
onReplayComplete(await)
// FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(None))
case other
internalStash.stash()
@ -310,8 +320,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
*/
private def onReplayComplete(await: Boolean): Unit = {
changeState(idle)
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self,
Update(await = false, autoUpdateReplayMax)))
if (await) internalStash.unstashAll()
}
}
@ -339,7 +347,9 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
}
def replayCompleted(receive: Receive): Unit = {
// FIXME what happens if RecoveryFailure is handled, i.e. actor is not stopped?
// in case the actor resumes the state must be `idle`
changeState(idle)
PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload))))
}
}
@ -355,11 +365,15 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
override def stateReceive(receive: Receive, message: Any): Unit = message match {
case r: Recover // ignore
case Update(awaitUpdate, replayMax)
changeState(replayStarted(await = awaitUpdate))
journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
case ScheduledUpdate(replayMax) changeStateToReplayStarted(await = false, replayMax)
case Update(awaitUpdate, replayMax) changeStateToReplayStarted(awaitUpdate, replayMax)
case other PersistentView.super.aroundReceive(receive, other)
}
def changeStateToReplayStarted(await: Boolean, replayMax: Long): Unit = {
changeState(replayStarted(await))
journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self)
}
}
}

View file

@ -34,6 +34,7 @@ object AtLeastOnceDeliveryFailureSpec {
case object Start
case class Done(ints: Vector[Int])
case class Ack(i: Int)
case class ProcessingFailure(i: Int)
case class JournalingFailure(i: Int)
@ -77,9 +78,11 @@ object AtLeastOnceDeliveryFailureSpec {
val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate
if (contains(i)) {
log.debug(debugMessage(s"ignored duplicate ${i}"))
sender() ! Ack(i)
} else {
persist(MsgSent(i)) { evt
updateState(evt)
sender() ! Ack(i)
if (shouldFail(failureRate))
throw new TestException(debugMessage(s"failed at payload ${i}"))
else
@ -142,16 +145,26 @@ object AtLeastOnceDeliveryFailureSpec {
class ChaosApp(probe: ActorRef) extends Actor with ActorLogging {
val destination = context.actorOf(Props(classOf[ChaosDestination], probe), "destination")
val snd = context.actorOf(Props(classOf[ChaosSender], destination, probe), "sender")
var snd = createSender()
var acks = Set.empty[Int]
def createSender(): ActorRef =
context.watch(context.actorOf(Props(classOf[ChaosSender], destination, probe), "sender"))
def receive = {
case Start 1 to numMessages foreach (snd ! _)
case Ack(i) acks += i
case ProcessingFailure(i)
snd ! i
log.debug(s"resent ${i} after processing failure")
case JournalingFailure(i)
snd ! i
log.debug(s"resent ${i} after journaling failure")
case Terminated(_)
// snd will be stopped if ReadHighestSequenceNr fails
log.debug(s"sender stopped, starting it again")
snd = createSender()
1 to numMessages foreach (i if (!acks(i)) snd ! i)
}
}
}

View file

@ -23,6 +23,8 @@ object PersistentActorFailureSpec {
import PersistentActorSpec.Evt
import PersistentActorSpec.ExamplePersistentActor
class SimulatedException(msg: String) extends RuntimeException(msg) with NoStackTrace
class FailingInmemJournal extends AsyncWriteProxy {
import AsyncWriteProxy.SetStore
@ -38,13 +40,13 @@ object PersistentActorFailureSpec {
class FailingInmemStore extends InmemStore {
def failingReceive: Receive = {
case w: WriteMessages if isWrong(w)
throw new RuntimeException("Simulated Store failure") with NoStackTrace
throw new SimulatedException("Simulated Store failure")
case ReplayMessages(pid, fromSnr, toSnr, max)
val readFromStore = read(pid, fromSnr, toSnr, max)
if (readFromStore.length == 0)
sender() ! ReplaySuccess
else if (isCorrupt(readFromStore))
sender() ! ReplayFailure(new IllegalArgumentException(s"blahonga $fromSnr $toSnr"))
sender() ! ReplayFailure(new SimulatedException(s"blahonga $fromSnr $toSnr"))
else {
readFromStore.foreach(sender() ! _)
sender() ! ReplaySuccess
@ -79,6 +81,15 @@ object PersistentActorFailureSpec {
}
}
class ResumingSupervisor(testActor: ActorRef) extends Supervisor(testActor) {
override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e
testActor ! e
SupervisorStrategy.Resume
}
}
class FailingRecovery(name: String, recoveryFailureProbe: Option[ActorRef]) extends ExamplePersistentActor(name) {
def this(name: String) = this(name, None)
@ -88,17 +99,34 @@ object PersistentActorFailureSpec {
val failingRecover: Receive = {
case Evt(data) if data == "bad"
throw new RuntimeException("Simulated exception from receiveRecover")
throw new SimulatedException("Simulated exception from receiveRecover")
case r @ RecoveryFailure(cause) if recoveryFailureProbe.isDefined
recoveryFailureProbe.foreach { _ ! r }
throw new ActorKilledException(cause.getMessage)
}
override def receiveRecover: Receive = failingRecover orElse super.receiveRecover
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
}
class ThrowingActor1(name: String) extends ExamplePersistentActor(name) {
override val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
persist(Evt(s"${data}"))(updateState)
if (data == "err")
throw new SimulatedException("Simulated exception 1")
}
}
class ThrowingActor2(name: String) extends ExamplePersistentActor(name) {
override val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
persist(Evt(s"${data}")) { evt
if (data == "err")
throw new SimulatedException("Simulated exception 1")
updateState(evt)
}
}
}
}
@ -164,11 +192,75 @@ class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem"
val probe = TestProbe()
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref))
expectMsgType[ActorRef]
expectMsgType[ActorKilledException]
val recoveryFailure = probe.expectMsgType[RecoveryFailure]
recoveryFailure.payload should be(Some(Evt("bad")))
recoveryFailure.sequenceNr should be(Some(3L))
}
"continue by handling RecoveryFailure" in {
prepareFailingRecovery()
// recover by creating another with same name
val probe = TestProbe()
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref))
val persistentActor = expectMsgType[ActorRef]
val recoveryFailure = probe.expectMsgType[RecoveryFailure]
// continue
persistentActor ! Cmd("d")
persistentActor ! GetState
// "bad" failed, and "c" was not replayed
expectMsg(List("a", "b", "d"))
}
"support resume after recovery failure" in {
prepareFailingRecovery()
// recover by creating another with same name
system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[FailingRecovery], name)
val persistentActor = expectMsgType[ActorRef]
expectMsgType[ActorKilledException] // from supervisor
// resume
persistentActor ! Cmd("d")
persistentActor ! GetState
// "bad" failed, and "c" was not replayed
expectMsg(List("a", "b", "d"))
}
"support resume after persist failure" in {
system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name)
val persistentActor = expectMsgType[ActorRef]
persistentActor ! Cmd("a")
persistentActor ! Cmd("wrong")
persistentActor ! Cmd("b")
// Behavior1PersistentActor persists 2 events per Cmd,
// and therefore 2 exceptions from supervisor
expectMsgType[ActorKilledException]
expectMsgType[ActorKilledException]
persistentActor ! Cmd("c")
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "b-1", "b-2", "c-1", "c-2"))
}
"support resume when persist followed by exception" in {
system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor1], name)
val persistentActor = expectMsgType[ActorRef]
persistentActor ! Cmd("a")
persistentActor ! Cmd("err")
persistentActor ! Cmd("b")
expectMsgType[SimulatedException] // from supervisor
persistentActor ! Cmd("c")
persistentActor ! GetState
expectMsg(List("a", "err", "b", "c"))
}
"support resume when persist handler throws exception" in {
system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor2], name)
val persistentActor = expectMsgType[ActorRef]
persistentActor ! Cmd("a")
persistentActor ! Cmd("b")
persistentActor ! Cmd("err")
persistentActor ! Cmd("c")
expectMsgType[SimulatedException] // from supervisor
persistentActor ! Cmd("d")
persistentActor ! GetState
expectMsg(List("a", "b", "c", "d"))
}
}
}